Exercice - Intégrer un notebook dans des pipelines Azure Synapse
Dans cette unité, vous créez un notebook Azure Synapse Spark pour analyser et transformer les données chargées par un flux de données de mappage et stocker les données dans un lac de données. Vous créez une cellule de paramètre qui accepte un paramètre de chaîne qui définit le nom de dossier des données que le notebook écrit dans le lac de données.
Vous ajoutez ensuite ce notebook à un pipeline Synapse et passez l’ID d’exécution du pipeline unique au paramètre de notebook afin de pouvoir corréler par la suite l’exécution du pipeline avec les données enregistrées par l’activité du notebook.
Enfin, vous utilisez le hub Superviser dans Synapse Studio pour superviser l’exécution du pipeline, obtenir l’ID d’exécution, puis rechercher les fichiers correspondants stockés dans le lac de données.
À propos d’Apache Spark et des notebooks
Apache Spark est un framework de traitement parallèle qui prend en charge le traitement en mémoire pour améliorer les performances des applications d’analytique du Big Data. Apache Spark dans Azure Synapse Analytics est l’une des implémentations par Microsoft d’Apache Spark dans le cloud.
Un notebook Apache Spark dans Synapse Studio est une interface web vous permettant de créer des fichiers contenant du code, des visualisations et du texte descriptif dynamiques. Les notebooks constituent un bon endroit où valider des idées et effectuer des expérimentations rapides pour extraire des insights de vos données. Les notebooks sont également largement utilisés pour la préparation et la visualisation de données, le machine learning et d’autres scénarios de Big Data.
Créer un notebook Synapse Spark
Supposez que vous avez créé un flux de données de mappage dans Synapse Analytics pour traiter, joindre et importer des données de profil utilisateur. Vous souhaitez à présent rechercher les cinq premiers produits pour chaque utilisateur, en fonction de ceux qui sont à la fois préférés et premiers choix, et qui comptabilisent le plus d’achats au cours des 12 derniers mois. Ensuite, vous souhaitez calculer les cinq premiers produits tous confondus.
Au cours de cet exercice, vous allez créer un notebook Synapse Spark pour effectuer ces calculs.
Ouvrez Synapse Analytics Studio (https://web.azuresynapse.net/), puis accédez au hub Données.
Sélectionnez l’onglet Lié(1) et développez le compte de stockage de lac de données principal (2) sous Azure Data Lake Storage Gen2. Sélectionnez le conteneur wwi-02(3) et ouvrez le dossier top-products(4). Cliquez avec le bouton droit sur n’importe quel fichier Parquet (5), sélectionnez l’élément de menu Nouveau notebook(6), puis sélectionnez Chargement dans le dataframe (7). Si vous ne voyez pas le dossier, sélectionnez
Refresh
.Assurez-vous que le notebook est attaché au pool Spark.
Remplacez le nom de fichier Parquet par
*.parquet
(1) pour sélectionner tous les fichiers Parquet dans le dossiertop-products
. Par exemple, le chemin doit ressembler à ce qui suit :abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Sélectionnez Exécuter tout dans la barre d’outils du notebook pour exécuter ce dernier.
Notes
La première fois que vous exécutez un notebook dans un pool Spark, Synapse crée une session. Cette opération peut prendre environ trois à cinq minutes.
Notes
Pour exécuter uniquement la cellule, pointez dessus et sélectionnez l’icône Exécuter la cellule à gauche de la cellule, ou sélectionnez la cellule, puis appuyez sur Ctrl + Entrée.
Créez une nouvelle cellule en-dessous en sélectionnant le bouton + et en choisissant l’élément Cellule de code. Le bouton + se trouve sous la cellule de notebook sur la gauche. Vous pouvez également développer le menu + Cellule dans la barre d’outils Notebook et sélectionner l’élément Cellule de code.
Exécutez la commande suivante dans la nouvelle cellule pour remplir un nouveau dataframe appelé
topPurchases
, créer une nouvelle vue temporaire appeléetop_purchases
et afficher les 100 premières lignes :topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Le résultat doit être semblable à ce qui suit :
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Exécutez la commande suivante dans une nouvelle cellule pour créer une nouvelle vue temporaire à l’aide de SQL :
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Notes
Il n’y a aucune sortie pour cette requête.
La requête utilise la vue temporaire
top_purchases
comme source et fait appel à une méthoderow_number() over
pour appliquer un numéro de ligne aux enregistrements de chaque utilisateur oùItemsPurchasedLast12Months
est le plus grand. La clausewhere
filtre les résultats de sorte que nous récupérons uniquement jusqu’à cinq produits pour lesquels les deux valeursIsTopProduct
etIsPreferredProduct
sont définies sur true. Cela nous donne les cinq produits les plus achetés pour chaque utilisateur. Ils sont également identifiés comme leurs produits préférés selon leur profil utilisateur stocké dans Azure Cosmos DB.Exécutez la commande suivante dans une nouvelle cellule pour créer et afficher un nouveau DataFrame qui stocke les résultats de la vue temporaire
top_5_products
que vous avez créée dans la cellule précédente :top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Vous devez voir une sortie similaire à ce qui suit, qui affiche les cinq premiers produits préférés par utilisateur :
Calculez les cinq premiers produits tous confondus, en fonction de ceux qui sont préférés par les clients et achetés le plus. Pour ce faire, exécutez la commande suivante dans une nouvelle cellule :
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
Dans cette cellule, nous avons regroupé les cinq premiers produits préférés par ID de produit, totalisé le nombre total d’articles achetés ces 12 derniers mois, trié cette valeur dans l’ordre décroissant et retourné les cinq premiers résultats. Le résultat doit ressembler à ce qui suit :
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Créer une cellule de paramètre
Les pipelines Azure Synapse recherchent la cellule de paramètre et la traite comme cellule par défaut pour les paramètres passés au moment de l’exécution. Le moteur d’exécution ajoutera une nouvelle cellule sous la cellule des paramètres avec des paramètres d’entrée en vue de remplacer les valeurs par défaut. Lorsqu’il n’y a pas de cellule de paramètres désignée, la cellule injectée est insérée tout en haut du notebook.
Nous allons exécuter ce notebook à partir d’un pipeline. Nous voulons passer un paramètre qui définit une valeur de variable
runId
qui sera utilisée pour nommer le fichier Parquet. Exécutez la commande suivante dans une nouvelle cellule :import uuid # Generate random GUID runId = uuid.uuid4()
Nous utilisons la bibliothèque
uuid
fournie avec Spark pour générer un GUID aléatoire. Nous voulons remplacer la variablerunId
par un paramètre passé par le pipeline. Pour ce faire, nous devons l’activer comme cellule de paramètre.Sélectionnez les points de suspension des actions (...) en haut à droite de la cellule (1), puis sélectionnez Activer/désactiver la cellule de paramètre (2).
Une fois cette option activée, vous voyez l’étiquette Paramètres sur la cellule.
Collez le code suivant dans une nouvelle cellule pour utiliser la variable
runId
en tant que nom de fichier Parquet dans le chemin/top5-products/
du compte de lac de données principal. RemplacezYOUR_DATALAKE_NAME
dans le chemin par le nom de votre compte de lac de données principal. Pour le trouver, faites défiler jusqu’à la Cellule 1 en haut de la page (1). Copiez le compte de stockage de lac de données à partir du chemin (2). Collez cette valeur en remplacement deYOUR_DATALAKE_NAME
dans le chemin (3) dans la nouvelle cellule, puis exécutez la commande dans la cellule.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Vérifiez que le fichier a été écrit dans le lac de données. Accédez au hub Données et sélectionnez l’onglet Lié(1). Développez le compte de stockage de lac de données principal, puis sélectionnez le conteneur wwi-02(2). Accédez au dossier top5-products(3). Vous devriez voir un dossier pour le fichier Parquet dans le répertoire avec un GUID comme nom de fichier (4).
La méthode write Parquet sur le dataframe dans la cellule Notebook a créé ce répertoire puisqu’il n’existait pas avant.
Ajouter le notebook à un pipeline Synapse
Pour en revenir au flux de données de mappage que nous avons décrit au début de l’exercice, supposez que vous souhaitez exécuter ce notebook après l’exécution du flux de données dans le cadre de votre processus d’orchestration. Pour ce faire, vous ajoutez ce notebook à un pipeline en tant que nouvelle activité Notebook.
Revenez au notebook. Sélectionnez Propriétés (1) en haut à droite du notebook, puis entrez
Calculate Top 5 Products
pour le Nom (2).Sélectionnez Ajouter au pipeline (1) en haut à droite du notebook, puis sélectionnez Pipeline existant (2).
Sélectionnez le pipeline Écrire les données de profil utilisateur dans ASA(1), puis sélectionnez Ajouter *(2).
Synapse Studio ajoute l’activité Notebook au pipeline. Réorganisez l’activité Notebook pour qu’elle se trouve à droite de l’activité Flux de données. Sélectionnez l’activité Flux de données, puis faites glisser un carré vert de connexion de pipeline d’activité Réussite vers l’activité Notebook.
La flèche d’activité Réussite indique au pipeline d’exécuter l’activité Notebook après que l’exécution de l’activité Flux de données a réussi.
Sélectionnez l’activité Notebook (1), sélectionnez l’onglet Paramètres(2), développez Paramètres de base (3), puis sélectionnez + Nouveau (4). Entrez
runId
dans le champ Nom(5). Sélectionnez Chaîne comme Type (6). Pour la Valeur, sélectionnez Ajouter du contenu dynamique (7).Sélectionnez ID d’exécution du pipeline sous Variables système (1). Cela ajoute
@pipeline().RunId
à la zone de contenu dynamique (2). Sélectionnez Terminer (3) pour fermer la boîte de dialogue.La valeur de l’ID d’exécution du pipeline est un GUID unique affecté à chaque exécution du pipeline. Nous allons utiliser cette valeur pour le nom du fichier Parquet en la passant en tant que paramètre de notebook
runId
. Nous pouvons ensuite examiner l’historique des exécutions du pipeline et rechercher le fichier Parquet spécifique créé pour chaque exécution du pipeline.Sélectionnez Tout publier puis Publier pour enregistrer vos modifications.
Une fois la publication terminée, sélectionnez Ajouter un déclencheur (1), puis Déclencher maintenant (2) pour exécuter le pipeline mis à jour.
Sélectionnez OK pour exécuter le déclencheur.
Surveiller l’exécution du pipeline.
Le hub Superviser vous permet de superviser les activités actuelles et historiques de SQL, Apache Spark et Pipelines.
Accédez au hub Superviser.
Sélectionnez Exécutions du pipeline (1) et attendez que l’exécution du pipeline se termine correctement (2). Vous devrez peut-être actualiser (3) la vue.
Sélectionnez le nom du pipeline pour voir les exécutions de l’activité du pipeline.
Notez à la fois l’activité Flux de données et la nouvelle activité Notebook(1). Notez la valeur de l’ID d’exécution du pipeline(2). Nous allons la comparer au nom du fichier Parquet généré par le notebook. Sélectionnez le nom du notebook Calculer les 5 premiers produits pour en voir les détails (3).
Nous voyons ici les détails de l’exécution de Notebook. Vous pouvez sélectionner Lecture (1) pour regarder une lecture de la progression dans les travaux (2). En bas, vous pouvez voir les Diagnostics et les Journaux avec différentes options de filtre (3). À droite, nous pouvons voir les détails de l’exécution, tels que la durée, l’ID Livy, les détails du pool Spark, et ainsi de suite. Sélectionnez le lien Afficher les détails sur un travail pour voir des informations détaillées à son sujet (5).
L’interface utilisateur de l’application Spark s’ouvre dans un nouvel onglet dans lequel nous pouvons voir les détails de la phase. Développez la visualisation DAG pour voir les détails de la phase.
Revenez au hub Données.
Sélectionnez l’onglet Lié(1), sélectionnez le conteneur wwi-02(2) sur le compte de stockage de lac de données principal, accédez au dossier top5-products(3), puis vérifiez qu’il existe un dossier pour le fichier Parquet dont le nom correspond à l’ID d’exécution du pipeline.
Comme vous pouvez le voir, nous avons un fichier dont le nom correspond à l’ID d’exécution du pipeline noté précédemment :
Ces valeurs correspondent parce que nous avons passé l’ID d’exécution du pipeline au paramètre
runId
sur l’activité Notebook.