Exercício – Integrar um Notebook nos pipelines do Azure Synapse

Concluído

Nesta unidade, você cria um notebook Spark do Azure Synapse para analisar e transformar dados carregados por um fluxo de dados de mapeamento e armazenar os dados em um data lake. Você cria uma célula de parâmetro que aceita um parâmetro de cadeia de caracteres que define o nome da pasta para os dados que o notebook grava no data lake.

Em seguida, você adiciona esse notebook a um pipeline do Synapse e passa a ID de execução de pipeline exclusiva para o parâmetro do notebook, para que possa posteriormente você possa correlacionar a execução de pipeline com os dados salvos pela atividade do notebook.

Por fim, você usa o hub do Monitor no Synapse Studio para monitorar a execução de pipeline, obter a ID de execução e localizar os arquivos correspondentes armazenados no data lake.

Sobre o Apache Spark e notebooks

O Apache Spark é uma estrutura de processamento paralelo que dá suporte ao processamento na memória para melhorar o desempenho de aplicativos de análise de Big Data. O Apache Spark no Azure Synapse Analytics é uma das implementações da Microsoft do Apache Spark na nuvem.

Um notebook do Apache Spark no Synapse Studio é uma interface da Web para você criar arquivos que contêm código em tempo real, visualizações e texto de narração. Os notebooks são um bom lugar para validar ideias e fazer experimentos rápidos para obter insights de seus dados. Os notebooks também são amplamente usados na preparação e visualização de dados, no aprendizado de máquina e em outros cenários de Big Data.

Criar um notebook Spark do Synapse

Suponha que você criou um fluxo de dados de mapeamento no Synapse Analytics para processar, ingressar e importar dados de perfil do usuário. Agora, você quer encontrar os cinco principais produtos para cada usuário, com base em quais são preferenciais e a principal escolha e têm mais compras nos últimos 12 meses. Depois, você quer calcular os cinco principais produtos de modo geral.

Neste exercício, você cria um notebook Spark do Synapse para fazer esses cálculos.

  1. Abra o Synapse Analytics Studio (https://web.azuresynapse.net/) e vá para o hub de Dados.

    O item de menu Dados é realçado.

  2. Selecione a guia Vinculado(1) e expanda a conta de armazenamento do data lake principal (2) abaixo do Azure Data Lake Storage Gen2. Selecione o contêiner wwi-02(3) e abra a pasta principais produtos(4). Clique com o botão direito do mouse em qualquer arquivo Parquet (5), selecione o item de menu Novo notebook(6) e selecione Carregar no DataFrame (7). Se não vir a pasta, selecione Refresh.

    O arquivo Parquet e a opção Novo notebook estão realçados.

  3. Verifique se o notebook está anexado ao pool do Spark.

    O item de menu Anexar ao pool do Spark é realçado.

  4. Substitua o nome do arquivo Parquet por *.parquet (1) para selecionar todos os arquivos Parquet na pasta top-products. Por exemplo, o caminho deve ser semelhante a: abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    O nome do arquivo é realçado.

  5. Selecione Executar tudo na barra de ferramentas do notebook para executá-lo.

    Os resultados da célula são exibidos.

    Observação

    Na primeira vez que você executar um notebook em um pool do Spark, o Azure Synapse criará uma sessão. Isso pode levar aproximadamente de três a cinco minutos.

    Observação

    Para executar apenas a célula, focalize a célula e selecione o ícone Executar célula à esquerda da célula ou selecione a célula e insira Ctrl + Enter.

  6. Crie uma célula abaixo selecionando o botão + e selecionando o item Célula de código. O botão + está localizado abaixo da célula do notebook, à esquerda. Como alternativa, expanda o menu + Célula na barra de ferramentas Notebook e selecione o item Célula de código.

    A opção de menu Adicionar Código é realçada.

  7. Execute o seguinte comando na nova célula para popular um novo dataframe chamado topPurchases, criar uma exibição temporária chamada top_purchases e mostrar as primeiras 100 linhas:

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    O resultado deve ser semelhante ao seguinte:

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Execute o seguinte comando em uma nova célula para criar uma exibição temporária usando SQL:

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Observação

    Não há nenhuma saída para esta consulta.

    A consulta usa a exibição temporária top_purchases como origem e aplica um método row_number() over para aplicar um número de linha para os registros de cada usuário, em que ItemsPurchasedLast12Months é o maior. A cláusula where filtra os resultados para que recuperemos somente até cinco produtos, em que IsTopProduct e IsPreferredProduct estão definidos como verdadeiro. Isso nos dá os cinco produtos mais comprados para cada usuário, com esses produtos também identificados como os produtos favoritos de acordo com o perfil do usuário armazenado no Azure Cosmos DB.

  9. Execute o seguinte comando em uma nova célula para criar e exibir um DataFrame que armazena os resultados da exibição temporária top_5_products criada na célula anterior:

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Você verá uma saída semelhante à seguinte, que mostra os cinco produtos de maior preferência de cada usuário:

    Os cinco produtos de maior preferência são exibidos por usuário.

  10. Calcule os cinco principais produtos de modo geral com base naqueles que são de maior preferência dos clientes e são mais adquiridos. Para fazer isso, execute o seguinte comando em uma nova célula:

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    Nessa célula, agrupamos os cinco produtos de maior preferência por ID do produto, somamos o total de itens comprados nos últimos 12 meses, classificamos esse valor em ordem decrescente e retornamos os cinco primeiros resultados. A saída deve ser semelhante a esta:

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Criar uma célula de parâmetro

Os pipelines do Azure Synapse procuram a célula de parâmetros e a tratam como padrão para os parâmetros passados no momento da execução. O mecanismo de execução adicionará uma nova célula abaixo da célula com parâmetros de entrada para substituir os valores padrão. Quando uma célula de parâmetros não for designada, a célula injetada será inserida na parte superior do notebook.

  1. Vamos executar esse notebook de um pipeline. Queremos passar um parâmetro que define um valor de variável runId que será usado para nomear o arquivo Parquet. Execute o seguinte comando em uma nova célula:

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Estamos usando a biblioteca uuid que vem com o Spark para gerar um GUID aleatório. Queremos substituir a variável runId por um parâmetro passado pelo pipeline. Para isso, precisamos alterná-la como uma célula de parâmetro.

  2. Selecione as reticências de ações (...) no canto superior direito da célula (1) e selecione Alternar célula de parâmetro (2).

    O item de menu é realçado.

    Depois de alternar essa opção, você verá a marca Parâmetros na célula.

    A célula está configurada para aceitar parâmetros.

  3. Cole o código a seguir em uma nova célula para usar a variável runId como o nome do arquivo Parquet no caminho /top5-products/ na conta principal do data lake. Substitua YOUR_DATALAKE_NAME no caminho pelo nome de sua conta principal do data lake. Para encontrá-lo, role para cima até a Célula 1 na parte superior da página (1). Copie a conta de armazenamento do data lake do caminho (2). Cole esse valor como substituição de YOUR_DATALAKE_NAME no caminho (3) dentro da nova célula e execute o comando na célula.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    O caminho é atualizado com o nome da conta principal do data lake.

  4. Verifique se o arquivo foi gravado no data lake. Vá até o hub de Dados e selecione a guia Vinculado(1). Expanda a conta do data lake storage principal e selecione o contêiner wwi-02(2). Vá até a pasta top5-products(3). Você deve ver uma pasta para o arquivo Parquet no diretório com um GUID como nome do arquivo (4).

    O arquivo parquet é realçado.

    O método de gravação Parquet no dataframe na célula do Notebook criou esse diretório, pois ele não existia.

Adicionar o notebook a um pipeline do Synapse

Voltando ao Fluxo de Dados de Mapeamento que descrevemos no início do exercício, suponha que você queira executar esse notebook após o Fluxo de Dados ser executado como parte do processo de orquestração. Para fazer isso, você adiciona o notebook a um pipeline como uma nova atividade de Notebook.

  1. Volte ao notebook. Selecione Propriedades (1) no canto superior direito do notebook e insira Calculate Top 5 Products para o Nome (2).

    A folha propriedades é exibida.

  2. Selecione Adicionar ao pipeline (1) no canto superior direito do notebook e selecione Pipeline existente (2).

    O botão Adicionar ao pipeline é realçado.

  3. Selecione o pipeline Gravar dados de perfil do usuário no ASA(1) e selecione Adicionar *(2).

    O pipeline é selecionado.

  4. O Synapse Studio adiciona a atividade Notebook ao pipeline. Reorganize a atividade Notebook para que ela fique à direita da atividade Data flow. Selecione a atividade Fluxo de Dados e arraste uma caixa verde de conexão do pipeline da atividade Success para a atividade Notebook.

    A seta verde é realçada.

    A seta da atividade Sucesso instrui o pipeline a executar a atividade de notebook após a atividade do fluxo de dados ser executada com êxito.

  5. Selecione a atividade Notebook (1), selecione a guia Configurações(2), expanda Parâmetros de base (3) e selecione + Novo (4). Insira runId no campo de Nome(5). Selecione Cadeia de caracteres como Tipo (6). Para Valor, selecione Adicionar conteúdo dinâmico (7).

    As configurações são exibidas.

  6. Selecione ID de execução de pipeline em Variáveis do sistema (1). Isso adiciona @pipeline().RunId à caixa de conteúdo dinâmico (2). Selecione Concluir (3) para fechar a caixa de diálogo.

    O formulário de conteúdo dinâmico é exibido.

    O valor da ID de execução de pipeline é um GUID exclusivo atribuído a cada execução de pipeline. Usaremos esse valor para o nome do arquivo Parquet passando-o como o parâmetro de Notebook runId. Em seguida, podemos examinar o histórico de execuções do pipeline e localizar o arquivo Parquet específico criado para cada execução de pipeline.

  7. Selecione Publicar tudo e Publicar para salvar as alterações.

    Publicar tudo é realçado.

  8. Após a publicação ser concluída, selecione Adicionar gatilho (1) e Disparar agora (2) para executar o pipeline atualizado.

    O item de menu de gatilho é realçado.

  9. Selecione OK para executar o gatilho.

    O botão OK é realçado.

Monitorar a execução de pipeline

O hub do Monitor permite monitorar atividades atuais e históricas do SQL, do Apache Spark e dos Pipelines.

  1. Vá para o hub Monitorar.

    O item de menu do hub Monitorar é selecionado.

  2. Selecione Execuções de pipeline (1) e aguarde a conclusão bem-sucedida da execução de pipeline (2). Talvez seja necessário atualizar (3) a exibição.

    A execução de pipeline foi bem-sucedida.

  3. Selecione o nome do pipeline para exibir as execuções de atividade dele.

    O nome da execução de pipeline é selecionado.

  4. Observe a atividade Fluxo de Dados e a nova atividade Notebook(1). Anote o valor da ID de execução do pipeline(2). Nós a compararemos ao nome do arquivo Parquet gerado pelo notebook. Selecione o nome do notebook Calcular os 5 principais produtos para exibir seus detalhes (3).

    Os detalhes da execução de pipeline são exibidos.

  5. Aqui, vemos os detalhes da execução do Notebook. Selecione Reprodução (1) para assistir a uma reprodução do progresso por meio dos trabalhos (2). Na parte inferior, você pode exibir o Diagnóstico e os Logs com opções de filtro diferentes (3). À direita, podemos exibir os detalhes da executar, como a duração, a ID do Livy, os detalhes do pool do Spark e assim por diante. Selecione o link Exibir detalhes em um trabalho para exibir seus detalhes (5).

    Os detalhes da execução são exibidos.

  6. A interface do usuário do aplicativo Spark é aberta em uma nova guia, onde podemos ver os detalhes da fase. Expanda a Visualização DAG para exibir os detalhes da fase.

    Os detalhes da preparação do Spark são exibidos.

  7. Volte para o hub Dados.

    Hub de dados.

  8. Selecione a guia Vinculado(1) e selecione o contêiner wwi-02(2) na conta do data lake storage principal, vá até a pasta Cinco principais produtos(3) e verifique se existe uma pasta para o arquivo Parquet cujo nome corresponde à ID de execução de pipeline.

    O arquivo é realçado.

    Como você pode ver, temos um arquivo cujo nome corresponde à ID de execução de pipeline que observamos anteriormente:

    A ID da execução de pipeline é realçada.

    Esses valores correspondem porque passamos a ID de execução do pipeline para o parâmetro runId na atividade Notebook.