Partilhar via


Disputa de dados com pools do Apache Spark (preterido)

APLICA-SE A: Python SDK azureml v1

Aviso

A integração do Azure Synapse Analytics com o Azure Machine Learning, disponível no Python SDK v1, foi preterida. Os usuários ainda podem usar o espaço de trabalho Synapse, registrado no Azure Machine Learning, como um serviço vinculado. No entanto, já não pode ser registada uma nova área de trabalho do Synapse no Azure Machine Learning como um serviço associado. Recomendamos o uso de computação Spark sem servidor e pools Synapse Spark anexados, disponíveis na CLI v2 e Python SDK v2. Para mais informações, visite https://aka.ms/aml-spark.

Neste artigo, você aprenderá a executar interativamente tarefas de disputa de dados em uma sessão Synapse dedicada, alimentada pelo Azure Synapse Analytics, em um bloco de anotações Jupyter. Essas tarefas dependem do SDK Python do Azure Machine Learning. Para obter mais informações sobre pipelines do Azure Machine Learning, visite Como usar o Apache Spark (com tecnologia do Azure Synapse Analytics) em seu pipeline de aprendizado de máquina (visualização). Para obter mais informações sobre como usar o Azure Synapse Analytics com um espaço de trabalho Synapse, visite a série de introdução do Azure Synapse Analytics.

Integração do Azure Machine Learning e do Azure Synapse Analytics

Com a integração do Azure Synapse Analytics com o Azure Machine Learning (visualização), você pode anexar um pool do Apache Spark, apoiado pelo Azure Synapse, para exploração e preparação de dados interativos. Com essa integração, você pode ter um recurso de computação dedicado para disputa de dados em escala, tudo dentro do mesmo notebook Python que você usa para treinar seus modelos de aprendizado de máquina.

Pré-requisitos

Inicie o pool Synapse Spark para tarefas de disputa de dados

Para iniciar a preparação de dados com o pool do Apache Spark, especifique o nome de computação do Spark Synapse anexado. Você pode encontrar esse nome com o estúdio do Azure Machine Learning na guia Cálculos anexados.

Obter nome de computação anexado

Importante

Para continuar a usar o pool do Apache Spark, você deve indicar qual recurso de computação usar em todas as suas tarefas de disputa de dados. Use %synapse para linhas únicas de código e %%synapse para várias linhas:

%synapse start -c SynapseSparkPoolAlias

Após o início da sessão, você pode verificar os metadados da sessão:

%synapse meta

Você pode especificar um ambiente do Azure Machine Learning para usar durante sua sessão do Apache Spark. Somente as dependências do Conda especificadas no ambiente entrarão em vigor. Não há suporte para imagens do Docker.

Aviso

As dependências Python especificadas nas dependências do ambiente Conda não são suportadas nos pools do Apache Spark. Atualmente, apenas versões fixas do Python são suportadas Inclua sys.version_info no seu script para verificar a sua versão do Python

Este código cria a variável de ambiente, para instalar azureml-core amyenv versão 1.20.0 e numpy a versão 1.17.0 antes do início da sessão. Em seguida, você pode incluir esse ambiente em sua instrução de sessão start do Apache Spark.


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Para iniciar a preparação de dados com o pool do Apache Spark em seu ambiente personalizado, especifique o nome do pool do Apache Spark e o ambiente a ser usado durante a sessão do Apache Spark. Você pode fornecer sua ID de assinatura, o grupo de recursos do espaço de trabalho de aprendizado de máquina e o nome do espaço de trabalho de aprendizado de máquina.

Importante

Certifique-se de ativar Permitir pacotes de nível de sessão no espaço de trabalho Synapse vinculado.

habilitar pacotes de nível de sessão

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Carregar dados do armazenamento

Depois que a sessão do Apache Spark for iniciada, leia os dados que você deseja preparar. O carregamento de dados tem suporte para o armazenamento de Blob do Azure e para as Gerações 1 e 2 do Armazenamento do Azure Data Lake.

Você tem duas opções para carregar dados desses serviços de armazenamento:

  • Carregue dados diretamente do armazenamento com o caminho do Hadoop Distributed Files System (HDFS)

  • Ler dados de um conjunto de dados existente do Azure Machine Learning

Para acessar esses serviços de armazenamento, você precisa de permissões do Storage Blob Data Reader . Para gravar dados de volta nesses serviços de armazenamento, você precisa de permissões de Colaborador de Dados de Blob de Armazenamento . Saiba mais sobre permissões e funções de armazenamento.

Carregar dados com o caminho do Hadoop Distributed Files System (HDFS)

Para carregar e ler dados do armazenamento com o caminho HDFS correspondente, você precisa de suas credenciais de autenticação de acesso a dados disponíveis. Essas credenciais diferem dependendo do seu tipo de armazenamento. Este exemplo de código mostra como ler dados de um armazenamento de Blob do Azure em um dataframe do Spark com seu token de assinatura de acesso compartilhado (SAS) ou chave de acesso:

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Este exemplo de código mostra como ler dados do Azure Data Lake Storage Generation 1 (ADLS Gen 1) com suas credenciais de entidade de serviço:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Este exemplo de código mostra como ler dados do Azure Data Lake Storage Generation 2 (ADLS Gen 2) com suas credenciais de entidade de serviço:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Ler dados de conjuntos de dados registados

Você também pode colocar um conjunto de dados registrado existente em seu espaço de trabalho e executar a preparação de dados nele, se você convertê-lo em um dataframe de faísca. Este exemplo autentica no espaço de trabalho, obtém um TabularDataset registrado -blob_dset - que faz referência a arquivos no armazenamento de blob e converte esse TabularDataset em um dataframe Spark. Ao converter seus conjuntos de dados em dataframeS do Spark, você pode usar pyspark bibliotecas de exploração e preparação de dados.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Executar tarefas de disputa de dados

Depois de recuperar e explorar seus dados, você pode executar tarefas de disputa de dados. Este exemplo de código expande o exemplo HDFS na seção anterior. Com base na coluna Sobrevivente , ele filtra os dados no dataframe df de faísca e nos grupos que listam por Idade:

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Salve dados no armazenamento e interrompa a sessão de faísca

Quando a exploração e a preparação de dados estiverem concluídas, armazene os dados preparados para uso posterior em sua conta de armazenamento no Azure. Neste exemplo de código, os dados preparados são gravados de volta no armazenamento de Blob do Azure, substituindo o arquivo original Titanic.csv no training_data diretório. Para gravar novamente no armazenamento, você precisa de permissões de Colaborador de Dados de Blob de Armazenamento . Para obter mais informações, visite Atribuir uma função do Azure para acesso a dados de blob.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

Depois de concluir a preparação de dados e salvar os dados preparados no armazenamento, encerre o uso do pool do Apache Spark com este comando:

%synapse stop

Criar um conjunto de dados, para representar dados preparados

Quando estiver pronto para consumir seus dados preparados para treinamento de modelo, conecte-se ao seu armazenamento com um armazenamento de dados do Azure Machine Learning e especifique o arquivo ou arquivo que deseja usar com um conjunto de dados do Azure Machine Learning.

Este exemplo de código

  • Supõe que você já criou um armazenamento de dados que se conecta ao serviço de armazenamento onde você salvou os dados preparados
  • Recupera esse armazenamento de dados existente - mydatastore - do espaço de trabalho ws com o método get().
  • Cria um FileDataset, , para fazer referência aos arquivos de dados preparados localizados no mydatastore training_data diretório train_ds
  • Cria a variável input1. Em um momento posterior, essa variável pode disponibilizar os arquivos de dados do train_ds conjunto de dados para um destino de computação para suas tarefas de treinamento.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Use a ScriptRunConfig para enviar uma execução de experimento para um pool Synapse Spark

Se você estiver pronto para automatizar e produzir suas tarefas de disputa de dados, poderá enviar uma execução de experimento para um pool Synapse Spark anexado com o objeto ScriptRunConfig . Da mesma forma, se você tiver um pipeline do Azure Machine Learning, poderá usar o SynapseSparkStep para especificar seu pool Synapse Spark como o destino de computação para a etapa de preparação de dados em seu pipeline. A disponibilidade de seus dados para o pool Synapse Spark depende do tipo de conjunto de dados.

  • Para um FileDataset, você pode usar o as_hdfs() método. Quando a execução é enviada, o conjunto de dados é disponibilizado para o pool do Synapse Spark como um sistema de arquivos distribuídos Hadoop (HFDS)
  • Para um TabularDataset, você pode usar o as_named_input() método

O exemplo de código a seguir

  • Cria variável input2 a partir do FileDataset train_ds, ele próprio criado no exemplo de código anterior
  • Cria variável output com a HDFSOutputDatasetConfiguration classe. Depois que a execução for concluída, essa classe nos permite salvar a saída da execução como o conjunto de dados, test no mydatastore armazenamento de dados. No espaço de trabalho do Azure Machine Learning, o test conjunto de dados é registrado sob o nome registered_dataset
  • Define as configurações que a execução deve usar para executar no pool Synapse Spark
  • Define os parâmetros ScriptRunConfig para
    • Use o dataprep.py script para a execução
    • Especifique os dados a serem usados como entrada e como disponibilizá-los para o pool Synapse Spark
    • Especificar onde armazenar os output dados de saída
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Para obter mais informações sobre run_config.spark.configuration a configuração geral do Spark, visite a Classe SparkConfiguration e a documentação de configuração do Apache Spark.

Depois de configurar seu ScriptRunConfig objeto, você pode enviar a execução.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Para obter mais informações, incluindo informações sobre o dataprep.py script usado neste exemplo, consulte o bloco de anotações de exemplo.

Depois de preparar seus dados, você pode usá-los como entrada para seus trabalhos de treinamento. No exemplo de código acima, você especificaria os registered_dataset como seus dados de entrada para trabalhos de treinamento.

Blocos de notas de exemplo

Analise estes blocos de anotações de exemplo para obter mais conceitos e demonstrações dos recursos de integração do Azure Synapse Analytics e do Azure Machine Learning:

Próximos passos