Disputa de dados com pools do Apache Spark (preterido)
APLICA-SE A: Python SDK azureml v1
Aviso
A integração do Azure Synapse Analytics com o Azure Machine Learning, disponível no Python SDK v1, foi preterida. Os usuários ainda podem usar o espaço de trabalho Synapse, registrado no Azure Machine Learning, como um serviço vinculado. No entanto, já não pode ser registada uma nova área de trabalho do Synapse no Azure Machine Learning como um serviço associado. Recomendamos o uso de computação Spark sem servidor e pools Synapse Spark anexados, disponíveis na CLI v2 e Python SDK v2. Para mais informações, visite https://aka.ms/aml-spark.
Neste artigo, você aprenderá a executar interativamente tarefas de disputa de dados em uma sessão Synapse dedicada, alimentada pelo Azure Synapse Analytics, em um bloco de anotações Jupyter. Essas tarefas dependem do SDK Python do Azure Machine Learning. Para obter mais informações sobre pipelines do Azure Machine Learning, visite Como usar o Apache Spark (com tecnologia do Azure Synapse Analytics) em seu pipeline de aprendizado de máquina (visualização). Para obter mais informações sobre como usar o Azure Synapse Analytics com um espaço de trabalho Synapse, visite a série de introdução do Azure Synapse Analytics.
Integração do Azure Machine Learning e do Azure Synapse Analytics
Com a integração do Azure Synapse Analytics com o Azure Machine Learning (visualização), você pode anexar um pool do Apache Spark, apoiado pelo Azure Synapse, para exploração e preparação de dados interativos. Com essa integração, você pode ter um recurso de computação dedicado para disputa de dados em escala, tudo dentro do mesmo notebook Python que você usa para treinar seus modelos de aprendizado de máquina.
Pré-requisitos
Configure seu ambiente de desenvolvimento para instalar o SDK do Azure Machine Learning ou use uma instância de computação do Azure Machine Learning com o SDK já instalado
Instalar o SDK Python do Azure Machine Learning
Crie um pool do Apache Spark usando o portal do Azure, ferramentas da Web ou o Synapse Studio
Instale o
azureml-synapse
pacote (visualização) com este código:pip install azureml-synapse
Vincule seu espaço de trabalho do Azure Machine Learning e o espaço de trabalho do Azure Synapse Analytics ao SDK Python do Azure Machine Learning ou ao estúdio do Azure Machine Learning
Anexar um pool Synapse Spark como um destino de computação
Inicie o pool Synapse Spark para tarefas de disputa de dados
Para iniciar a preparação de dados com o pool do Apache Spark, especifique o nome de computação do Spark Synapse anexado. Você pode encontrar esse nome com o estúdio do Azure Machine Learning na guia Cálculos anexados.
Importante
Para continuar a usar o pool do Apache Spark, você deve indicar qual recurso de computação usar em todas as suas tarefas de disputa de dados. Use %synapse
para linhas únicas de código e %%synapse
para várias linhas:
%synapse start -c SynapseSparkPoolAlias
Após o início da sessão, você pode verificar os metadados da sessão:
%synapse meta
Você pode especificar um ambiente do Azure Machine Learning para usar durante sua sessão do Apache Spark. Somente as dependências do Conda especificadas no ambiente entrarão em vigor. Não há suporte para imagens do Docker.
Aviso
As dependências Python especificadas nas dependências do ambiente Conda não são suportadas nos pools do Apache Spark. Atualmente, apenas versões fixas do Python são suportadas Inclua sys.version_info
no seu script para verificar a sua versão do Python
Este código cria a variável de ambiente, para instalar azureml-core
amyenv
versão 1.20.0 e numpy
a versão 1.17.0 antes do início da sessão. Em seguida, você pode incluir esse ambiente em sua instrução de sessão start
do Apache Spark.
from azureml.core import Workspace, Environment
# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)
Para iniciar a preparação de dados com o pool do Apache Spark em seu ambiente personalizado, especifique o nome do pool do Apache Spark e o ambiente a ser usado durante a sessão do Apache Spark. Você pode fornecer sua ID de assinatura, o grupo de recursos do espaço de trabalho de aprendizado de máquina e o nome do espaço de trabalho de aprendizado de máquina.
Importante
Certifique-se de ativar Permitir pacotes de nível de sessão no espaço de trabalho Synapse vinculado.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Carregar dados do armazenamento
Depois que a sessão do Apache Spark for iniciada, leia os dados que você deseja preparar. O carregamento de dados tem suporte para o armazenamento de Blob do Azure e para as Gerações 1 e 2 do Armazenamento do Azure Data Lake.
Você tem duas opções para carregar dados desses serviços de armazenamento:
Carregue dados diretamente do armazenamento com o caminho do Hadoop Distributed Files System (HDFS)
Ler dados de um conjunto de dados existente do Azure Machine Learning
Para acessar esses serviços de armazenamento, você precisa de permissões do Storage Blob Data Reader . Para gravar dados de volta nesses serviços de armazenamento, você precisa de permissões de Colaborador de Dados de Blob de Armazenamento . Saiba mais sobre permissões e funções de armazenamento.
Carregar dados com o caminho do Hadoop Distributed Files System (HDFS)
Para carregar e ler dados do armazenamento com o caminho HDFS correspondente, você precisa de suas credenciais de autenticação de acesso a dados disponíveis. Essas credenciais diferem dependendo do seu tipo de armazenamento. Este exemplo de código mostra como ler dados de um armazenamento de Blob do Azure em um dataframe do Spark com seu token de assinatura de acesso compartilhado (SAS) ou chave de acesso:
%%synapse
# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")
# read from blob
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")
Este exemplo de código mostra como ler dados do Azure Data Lake Storage Generation 1 (ADLS Gen 1) com suas credenciais de entidade de serviço:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
Este exemplo de código mostra como ler dados do Azure Data Lake Storage Generation 2 (ADLS Gen 2) com suas credenciais de entidade de serviço:
%%synapse
# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")
df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Ler dados de conjuntos de dados registados
Você também pode colocar um conjunto de dados registrado existente em seu espaço de trabalho e executar a preparação de dados nele, se você convertê-lo em um dataframe de faísca. Este exemplo autentica no espaço de trabalho, obtém um TabularDataset registrado -blob_dset
- que faz referência a arquivos no armazenamento de blob e converte esse TabularDataset em um dataframe Spark. Ao converter seus conjuntos de dados em dataframeS do Spark, você pode usar pyspark
bibliotecas de exploração e preparação de dados.
%%synapse
from azureml.core import Workspace, Dataset
subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"
ws = Workspace(workspace_name = workspace_name,
subscription_id = subscription_id,
resource_group = resource_group)
dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()
Executar tarefas de disputa de dados
Depois de recuperar e explorar seus dados, você pode executar tarefas de disputa de dados. Este exemplo de código expande o exemplo HDFS na seção anterior. Com base na coluna Sobrevivente , ele filtra os dados no dataframe df
de faísca e nos grupos que listam por Idade:
%%synapse
from pyspark.sql.functions import col, desc
df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)
df.show()
Salve dados no armazenamento e interrompa a sessão de faísca
Quando a exploração e a preparação de dados estiverem concluídas, armazene os dados preparados para uso posterior em sua conta de armazenamento no Azure. Neste exemplo de código, os dados preparados são gravados de volta no armazenamento de Blob do Azure, substituindo o arquivo original Titanic.csv
no training_data
diretório. Para gravar novamente no armazenamento, você precisa de permissões de Colaborador de Dados de Blob de Armazenamento . Para obter mais informações, visite Atribuir uma função do Azure para acesso a dados de blob.
%% synapse
df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")
Depois de concluir a preparação de dados e salvar os dados preparados no armazenamento, encerre o uso do pool do Apache Spark com este comando:
%synapse stop
Criar um conjunto de dados, para representar dados preparados
Quando estiver pronto para consumir seus dados preparados para treinamento de modelo, conecte-se ao seu armazenamento com um armazenamento de dados do Azure Machine Learning e especifique o arquivo ou arquivo que deseja usar com um conjunto de dados do Azure Machine Learning.
Este exemplo de código
- Supõe que você já criou um armazenamento de dados que se conecta ao serviço de armazenamento onde você salvou os dados preparados
- Recupera esse armazenamento de dados existente -
mydatastore
- do espaço de trabalhows
com o método get(). - Cria um FileDataset, , para fazer referência aos arquivos de dados preparados localizados no
mydatastore
training_data
diretóriotrain_ds
- Cria a variável
input1
. Em um momento posterior, essa variável pode disponibilizar os arquivos de dados dotrain_ds
conjunto de dados para um destino de computação para suas tarefas de treinamento.
from azureml.core import Datastore, Dataset
datastore = Datastore.get(ws, datastore_name='mydatastore')
datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()
Use a ScriptRunConfig
para enviar uma execução de experimento para um pool Synapse Spark
Se você estiver pronto para automatizar e produzir suas tarefas de disputa de dados, poderá enviar uma execução de experimento para um pool Synapse Spark anexado com o objeto ScriptRunConfig . Da mesma forma, se você tiver um pipeline do Azure Machine Learning, poderá usar o SynapseSparkStep para especificar seu pool Synapse Spark como o destino de computação para a etapa de preparação de dados em seu pipeline. A disponibilidade de seus dados para o pool Synapse Spark depende do tipo de conjunto de dados.
- Para um FileDataset, você pode usar o
as_hdfs()
método. Quando a execução é enviada, o conjunto de dados é disponibilizado para o pool do Synapse Spark como um sistema de arquivos distribuídos Hadoop (HFDS) - Para um TabularDataset, você pode usar o
as_named_input()
método
O exemplo de código a seguir
- Cria variável
input2
a partir do FileDatasettrain_ds
, ele próprio criado no exemplo de código anterior - Cria variável
output
com aHDFSOutputDatasetConfiguration
classe. Depois que a execução for concluída, essa classe nos permite salvar a saída da execução como o conjunto de dados,test
nomydatastore
armazenamento de dados. No espaço de trabalho do Azure Machine Learning, otest
conjunto de dados é registrado sob o nomeregistered_dataset
- Define as configurações que a execução deve usar para executar no pool Synapse Spark
- Define os parâmetros ScriptRunConfig para
- Use o
dataprep.py
script para a execução - Especifique os dados a serem usados como entrada e como disponibilizá-los para o pool Synapse Spark
- Especificar onde armazenar os
output
dados de saída
- Use o
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig
from azureml.core import Experiment
input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")
run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name
run_config.spark.configuration["spark.driver.memory"] = "1g"
run_config.spark.configuration["spark.driver.cores"] = 2
run_config.spark.configuration["spark.executor.memory"] = "1g"
run_config.spark.configuration["spark.executor.cores"] = 1
run_config.spark.configuration["spark.executor.instances"] = 1
conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")
run_config.environment.python.conda_dependencies = conda_dep
script_run_config = ScriptRunConfig(source_directory = './code',
script= 'dataprep.py',
arguments = ["--file_input", input2,
"--output_dir", output],
run_config = run_config)
Para obter mais informações sobre run_config.spark.configuration
a configuração geral do Spark, visite a Classe SparkConfiguration e a documentação de configuração do Apache Spark.
Depois de configurar seu ScriptRunConfig
objeto, você pode enviar a execução.
from azureml.core import Experiment
exp = Experiment(workspace=ws, name="synapse-spark")
run = exp.submit(config=script_run_config)
run
Para obter mais informações, incluindo informações sobre o dataprep.py
script usado neste exemplo, consulte o bloco de anotações de exemplo.
Depois de preparar seus dados, você pode usá-los como entrada para seus trabalhos de treinamento. No exemplo de código acima, você especificaria os registered_dataset
como seus dados de entrada para trabalhos de treinamento.
Blocos de notas de exemplo
Analise estes blocos de anotações de exemplo para obter mais conceitos e demonstrações dos recursos de integração do Azure Synapse Analytics e do Azure Machine Learning:
- Execute uma sessão interativa do Spark a partir de um bloco de notas na sua área de trabalho do Azure Machine Learning.
- Envie um experimento do Azure Machine Learning executado com um pool do Synapse Spark como seu destino de computação.
Próximos passos
- Treine um modelo.
- Treine com o conjunto de dados do Azure Machine Learning.