Partager via


Utiliser l’API Livy pour envoyer et exécuter des travaux par lots Livy

Remarque

L’API Livy pour Engineering données Fabric est en préversion.

S’applique à :✅ l’engineering et la science des données dans Microsoft Fabric

Envoyez des travaux par lots Spark à l’aide de l’API Livy pour Engineering données Fabric.

Prérequis

L’API Livy définit un point de terminaison unifié pour les opérations. Remplacez les espaces réservés {Entra_TenantID}, {Entra_ClientID}, {Fabric_WorkspaceID} et {Fabric_LakehouseID} par vos valeurs appropriées lorsque vous suivez les exemples de cet article.

Configurer Visual Studio Code pour votre lot d’API Livy

  1. Sélectionnez Paramètres Lakehouse dans votre lakehouse Fabric.

    Capture d’écran montrant les paramètres du lakehouse.

  2. Accédez à la section Point de terminaison Livy.

    Capture d’écran montrant le point de terminaison Lakehouse Livy et la chaîne de connexion de travail de session.

  3. Copiez la chaîne de connexion du travail par lots (deuxième zone rouge dans l’image) dans votre code.

  4. Accédez au centre d’administration Microsoft Entra et copiez l’ID d’application (client) et l’ID d’annuaire (locataire) dans votre code.

    Capture d’écran montrant la vue d’ensemble de l’application d’API Livy dans le centre d’administration Microsoft Entra.

Créer une charge utile Spark à charger dans votre Lakehouse

  1. Créez un notebook .ipynb dans Visual Studio Code et insérez le code suivant.

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM Guyhay_LivyDemo2.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. Enregistrez le fichier Python localement. Cette charge utile de code Python contient deux instructions Spark qui fonctionnent sur des données dans un Lakehouse et qui doivent être chargées dans votre Lakehouse. Vous aurez besoin du chemin ABFS de la charge utile pour référencer votre travail par lots d’API Livy dans Visual Studio Code.

    Capture d’écran montrant la cellule de charge utile Python.

  3. Chargez la charge utile Python dans la section des fichiers de votre Lakehouse. > Obtenir des données > Charger des fichiers > et cliquez dans la zone d’entrée Fichiers.

    Capture d’écran montrant la charge utile dans la section Fichiers du Lakehouse.

  4. Une fois que le fichier est dans la section Fichiers de votre Lakehouse, cliquez sur les trois petits points à droite du nom de fichier de votre charge utile, puis sélectionnez Propriétés.

    Capture d’écran montrant le chemin ABFS de la charge utile dans les Propriétés du fichier dans le Lakehouse.

  5. Copiez ce chemin ABFS dans votre cellule Notebook à l’étape 1.

Créer une session par lots Spark d’API Livy

  1. Créez un notebook .ipynb dans Visual Studio Code et insérez le code suivant.

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. Exécutez la cellule de notebook. Une fenêtre contextuelle devrait apparaître dans votre navigateur pour vous permettre de choisir l’identité avec laquelle vous connecter.

    Capture d’écran montrant l’écran de connexion à l’application Microsoft Entra.

  3. Une fois que vous avez choisi l’identité à utiliser pour vous connecter, vous êtes également invité à approuver les autorisations de l’API d’inscription d’application Microsoft Entra.

    Capture d’écran montrant les autorisations de l’API d’application Microsoft Entra.

  4. Fermez la fenêtre du navigateur après avoir procédé à l’authentification.

    Capture d’écran montrant l’authentification réussie.

  5. Dans Visual Studio Code, vous devriez voir le jeton Microsoft Entra retourné.

    Capture d’écran montrant le jeton Microsoft Entra retourné après l’exécution de la cellule et la connexion.

  6. Ajoutez une autre cellule de notebook et insérez ce code.

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. Exécutez la cellule de notebook. Vous devriez voir deux lignes qui s’affichent lors de la création du travail par lots Livy.

    Capture d’écran montrant les résultats de la création de la session par lots.

Envoyer une instruction spark.sql à l’aide de la session par lots de l’API Livy

  1. Ajoutez une autre cellule de notebook et insérez ce code.

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. Exécutez la cellule de notebook. Vous devriez voir plusieurs lignes qui s’affichent lors de la création et de l’exécution du travail par lots Livy.

    Capture d’écran montrant les résultats dans Visual Studio Code après avoir correctement envoyé le travail par lots Livy.

  3. Revenez à votre Lakehouse pour voir les changements.

Afficher vos travaux dans le hub de supervision

Vous pouvez accéder au hub de supervision pour afficher diverses activités Apache Spark en sélectionnant Superviser dans les liens de navigation de gauche.

  1. Lorsque le travail par lots est à l’état terminé, vous pouvez voir l’état de la session en accédant à Superviser.

    Capture d’écran montrant les envois d’API Livy précédents dans le hub de supervision.

  2. Sélectionnez et ouvrez le nom de l’activité la plus récente.

    Capture d’écran montrant l’activité la plus récente de l’API Livy dans le hub de supervision.

  3. Dans ce cas de session d’API Livy, vous pouvez voir votre envoi de lot précédent, les détails d’exécution, les versions Spark et la configuration. Notez l’état Arrêté en haut à droite.

    Capture d’écran montrant les détails de l’activité la plus récente de l’API Livy dans le hub de supervision.

Pour récapituler l’ensemble du processus, vous avez besoin d’un client à distance tel que Visual Studio Code, d’un jeton d’application Microsoft Entra, de l’URL du point de terminaison de l’API Livy, de l’authentification auprès de votre Lakehouse, d’une charge de travail Spark dans votre Lakehouse et enfin d’une session d’API Livy par lots.