Exercício – Integrar um Notebook nos pipelines do Azure Synapse
Nesta unidade, você cria um notebook Spark do Azure Synapse para analisar e transformar dados carregados por um fluxo de dados de mapeamento e armazenar os dados em um data lake. Você cria uma célula de parâmetro que aceita um parâmetro de cadeia de caracteres que define o nome da pasta para os dados que o notebook grava no data lake.
Em seguida, você adiciona esse notebook a um pipeline do Synapse e passa a ID de execução de pipeline exclusiva para o parâmetro do notebook, para que possa posteriormente você possa correlacionar a execução de pipeline com os dados salvos pela atividade do notebook.
Por fim, você usa o hub do Monitor no Synapse Studio para monitorar a execução de pipeline, obter a ID de execução e localizar os arquivos correspondentes armazenados no data lake.
Sobre o Apache Spark e notebooks
O Apache Spark é uma estrutura de processamento paralelo que dá suporte ao processamento na memória para melhorar o desempenho de aplicativos de análise de Big Data. O Apache Spark no Azure Synapse Analytics é uma das implementações da Microsoft do Apache Spark na nuvem.
Um notebook do Apache Spark no Synapse Studio é uma interface da Web para você criar arquivos que contêm código em tempo real, visualizações e texto de narração. Os notebooks são um bom lugar para validar ideias e fazer experimentos rápidos para obter insights de seus dados. Os notebooks também são amplamente usados na preparação e visualização de dados, no aprendizado de máquina e em outros cenários de Big Data.
Criar um notebook Spark do Synapse
Suponha que você criou um fluxo de dados de mapeamento no Synapse Analytics para processar, ingressar e importar dados de perfil do usuário. Agora, você quer encontrar os cinco principais produtos para cada usuário, com base em quais são preferenciais e a principal escolha e têm mais compras nos últimos 12 meses. Depois, você quer calcular os cinco principais produtos de modo geral.
Neste exercício, você cria um notebook Spark do Synapse para fazer esses cálculos.
Abra o Synapse Analytics Studio (https://web.azuresynapse.net/) e vá para o hub de Dados.
Selecione a guia Vinculado(1) e expanda a conta de armazenamento do data lake principal (2) abaixo do Azure Data Lake Storage Gen2. Selecione o contêiner wwi-02(3) e abra a pasta principais produtos(4). Clique com o botão direito do mouse em qualquer arquivo Parquet (5), selecione o item de menu Novo notebook(6) e selecione Carregar no DataFrame (7). Se não vir a pasta, selecione
Refresh
.Verifique se o notebook está anexado ao pool do Spark.
Substitua o nome do arquivo Parquet por
*.parquet
(1) para selecionar todos os arquivos Parquet na pastatop-products
. Por exemplo, o caminho deve ser semelhante a:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Selecione Executar tudo na barra de ferramentas do notebook para executá-lo.
Observação
Na primeira vez que você executar um notebook em um pool do Spark, o Azure Synapse criará uma sessão. Isso pode levar aproximadamente de três a cinco minutos.
Observação
Para executar apenas a célula, focalize a célula e selecione o ícone Executar célula à esquerda da célula ou selecione a célula e insira Ctrl + Enter.
Crie uma célula abaixo selecionando o botão + e selecionando o item Célula de código. O botão + está localizado abaixo da célula do notebook, à esquerda. Como alternativa, expanda o menu + Célula na barra de ferramentas Notebook e selecione o item Célula de código.
Execute o seguinte comando na nova célula para popular um novo dataframe chamado
topPurchases
, criar uma exibição temporária chamadatop_purchases
e mostrar as primeiras 100 linhas:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
O resultado deve ser semelhante ao seguinte:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Execute o seguinte comando em uma nova célula para criar uma exibição temporária usando SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Observação
Não há nenhuma saída para esta consulta.
A consulta usa a exibição temporária
top_purchases
como origem e aplica um métodorow_number() over
para aplicar um número de linha para os registros de cada usuário, em queItemsPurchasedLast12Months
é o maior. A cláusulawhere
filtra os resultados para que recuperemos somente até cinco produtos, em queIsTopProduct
eIsPreferredProduct
estão definidos como verdadeiro. Isso nos dá os cinco produtos mais comprados para cada usuário, com esses produtos também identificados como os produtos favoritos de acordo com o perfil do usuário armazenado no Azure Cosmos DB.Execute o seguinte comando em uma nova célula para criar e exibir um DataFrame que armazena os resultados da exibição temporária
top_5_products
criada na célula anterior:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Você verá uma saída semelhante à seguinte, que mostra os cinco produtos de maior preferência de cada usuário:
Calcule os cinco principais produtos de modo geral com base naqueles que são de maior preferência dos clientes e são mais adquiridos. Para fazer isso, execute o seguinte comando em uma nova célula:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
Nessa célula, agrupamos os cinco produtos de maior preferência por ID do produto, somamos o total de itens comprados nos últimos 12 meses, classificamos esse valor em ordem decrescente e retornamos os cinco primeiros resultados. A saída deve ser semelhante a esta:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Criar uma célula de parâmetro
Os pipelines do Azure Synapse procuram a célula de parâmetros e a tratam como padrão para os parâmetros passados no momento da execução. O mecanismo de execução adicionará uma nova célula abaixo da célula com parâmetros de entrada para substituir os valores padrão. Quando uma célula de parâmetros não for designada, a célula injetada será inserida na parte superior do notebook.
Vamos executar esse notebook de um pipeline. Queremos passar um parâmetro que define um valor de variável
runId
que será usado para nomear o arquivo Parquet. Execute o seguinte comando em uma nova célula:import uuid # Generate random GUID runId = uuid.uuid4()
Estamos usando a biblioteca
uuid
que vem com o Spark para gerar um GUID aleatório. Queremos substituir a variávelrunId
por um parâmetro passado pelo pipeline. Para isso, precisamos alterná-la como uma célula de parâmetro.Selecione as reticências de ações (...) no canto superior direito da célula (1) e selecione Alternar célula de parâmetro (2).
Depois de alternar essa opção, você verá a marca Parâmetros na célula.
Cole o código a seguir em uma nova célula para usar a variável
runId
como o nome do arquivo Parquet no caminho/top5-products/
na conta principal do data lake. SubstituaYOUR_DATALAKE_NAME
no caminho pelo nome de sua conta principal do data lake. Para encontrá-lo, role para cima até a Célula 1 na parte superior da página (1). Copie a conta de armazenamento do data lake do caminho (2). Cole esse valor como substituição deYOUR_DATALAKE_NAME
no caminho (3) dentro da nova célula e execute o comando na célula.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Verifique se o arquivo foi gravado no data lake. Vá até o hub de Dados e selecione a guia Vinculado(1). Expanda a conta do data lake storage principal e selecione o contêiner wwi-02(2). Vá até a pasta top5-products(3). Você deve ver uma pasta para o arquivo Parquet no diretório com um GUID como nome do arquivo (4).
O método de gravação Parquet no dataframe na célula do Notebook criou esse diretório, pois ele não existia.
Adicionar o notebook a um pipeline do Synapse
Voltando ao Fluxo de Dados de Mapeamento que descrevemos no início do exercício, suponha que você queira executar esse notebook após o Fluxo de Dados ser executado como parte do processo de orquestração. Para fazer isso, você adiciona o notebook a um pipeline como uma nova atividade de Notebook.
Volte ao notebook. Selecione Propriedades (1) no canto superior direito do notebook e insira
Calculate Top 5 Products
para o Nome (2).Selecione Adicionar ao pipeline (1) no canto superior direito do notebook e selecione Pipeline existente (2).
Selecione o pipeline Gravar dados de perfil do usuário no ASA(1) e selecione Adicionar *(2).
O Synapse Studio adiciona a atividade Notebook ao pipeline. Reorganize a atividade Notebook para que ela fique à direita da atividade Data flow. Selecione a atividade Fluxo de Dados e arraste uma caixa verde de conexão do pipeline da atividade Success para a atividade Notebook.
A seta da atividade Sucesso instrui o pipeline a executar a atividade de notebook após a atividade do fluxo de dados ser executada com êxito.
Selecione a atividade Notebook (1), selecione a guia Configurações(2), expanda Parâmetros de base (3) e selecione + Novo (4). Insira
runId
no campo de Nome(5). Selecione Cadeia de caracteres como Tipo (6). Para Valor, selecione Adicionar conteúdo dinâmico (7).Selecione ID de execução de pipeline em Variáveis do sistema (1). Isso adiciona
@pipeline().RunId
à caixa de conteúdo dinâmico (2). Selecione Concluir (3) para fechar a caixa de diálogo.O valor da ID de execução de pipeline é um GUID exclusivo atribuído a cada execução de pipeline. Usaremos esse valor para o nome do arquivo Parquet passando-o como o parâmetro de Notebook
runId
. Em seguida, podemos examinar o histórico de execuções do pipeline e localizar o arquivo Parquet específico criado para cada execução de pipeline.Selecione Publicar tudo e Publicar para salvar as alterações.
Após a publicação ser concluída, selecione Adicionar gatilho (1) e Disparar agora (2) para executar o pipeline atualizado.
Selecione OK para executar o gatilho.
Monitorar a execução de pipeline
O hub do Monitor permite monitorar atividades atuais e históricas do SQL, do Apache Spark e dos Pipelines.
Vá para o hub Monitorar.
Selecione Execuções de pipeline (1) e aguarde a conclusão bem-sucedida da execução de pipeline (2). Talvez seja necessário atualizar (3) a exibição.
Selecione o nome do pipeline para exibir as execuções de atividade dele.
Observe a atividade Fluxo de Dados e a nova atividade Notebook(1). Anote o valor da ID de execução do pipeline(2). Nós a compararemos ao nome do arquivo Parquet gerado pelo notebook. Selecione o nome do notebook Calcular os 5 principais produtos para exibir seus detalhes (3).
Aqui, vemos os detalhes da execução do Notebook. Selecione Reprodução (1) para assistir a uma reprodução do progresso por meio dos trabalhos (2). Na parte inferior, você pode exibir o Diagnóstico e os Logs com opções de filtro diferentes (3). À direita, podemos exibir os detalhes da executar, como a duração, a ID do Livy, os detalhes do pool do Spark e assim por diante. Selecione o link Exibir detalhes em um trabalho para exibir seus detalhes (5).
A interface do usuário do aplicativo Spark é aberta em uma nova guia, onde podemos ver os detalhes da fase. Expanda a Visualização DAG para exibir os detalhes da fase.
Volte para o hub Dados.
Selecione a guia Vinculado(1) e selecione o contêiner wwi-02(2) na conta do data lake storage principal, vá até a pasta Cinco principais produtos(3) e verifique se existe uma pasta para o arquivo Parquet cujo nome corresponde à ID de execução de pipeline.
Como você pode ver, temos um arquivo cujo nome corresponde à ID de execução de pipeline que observamos anteriormente:
Esses valores correspondem porque passamos a ID de execução do pipeline para o parâmetro
runId
na atividade Notebook.