共用方式為


使用 Livy API 提交和執行 Livy 批次作業

注意

適用於網狀架構的 Livy API 資料工程師 處於預覽狀態。

適用於:✅Microsoft Fabric 中的 資料工程師 和 資料科學

使用適用於網狀架構的 Livy API 資料工程師 提交 Spark 批次作業。

必要條件

Livy API 會定義作業的統一端點。 當您遵循本文中的範例時,請將佔位元 {Entra_TenantID}、{Entra_ClientID}、{Fabric_WorkspaceID}和 {Fabric_LakehouseID} 取代為您適當的值。

設定 Livy API Batch 的 Visual Studio Code

  1. 在 Fabric Lakehouse 中選取 [Lakehouse 設定 ]。

    顯示 Lakehouse 設定的螢幕快照。

  2. 流覽至 [Livy 端點 ] 區段。

    顯示 Lakehouse Livy 端點和工作階段作業 連接字串 的螢幕快照。

  3. 將 Batch 作業 連接字串 (映像中的第二個紅色方塊) 複製到您的程式代碼。

  4. 流覽至 Microsoft Entra 系統管理中心 ,並將應用程式 (用戶端) 識別碼和目錄 (租使用者) 識別碼複製到您的程式碼。

    顯示 Microsoft Entra 系統管理中心中 Livy API 應用程式概觀的螢幕快照。

建立 Spark 承載並上傳至您的 Lakehouse

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼

    import sys
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.conf import SparkConf
    from pyspark.sql.functions import col
    
    if __name__ == "__main__":
    
         #Spark session builder
         spark_session = (SparkSession
           .builder
           .appName("livybatchdemo") 
           .getOrCreate())
    
         spark_context = spark_session.sparkContext
         spark_context.setLogLevel("DEBUG")  
    
         targetLakehouse = spark_context.getConf().get("spark.targetLakehouse")
    
         if targetLakehouse is not None:
           print("targetLakehouse: " + str(targetLakehouse))
         else:
           print("targetLakehouse is None")
    
    df_valid_totalPrice = spark_session.sql("SELECT * FROM <YourLakeHouseDataTableName>.transactions where TotalPrice > 0")
    df_valid_totalPrice_plus_year = df_valid_totalPrice.withColumn("transaction_year", col("TransactionDate").substr(1, 4))
    
    deltaTablePath = "abfss:<YourABFSSpath>"+str(targetLakehouse)+".Lakehouse/Tables/CleanedTransactions"
    df_valid_totalPrice_plus_year.write.mode('overwrite').format('delta').save(deltaTablePath)
    
  2. 將 Python 檔案儲存在本機。 此 Python 程式代碼承載包含兩個 Spark 語句,可在 Lakehouse 中處理數據,且必須上傳至您的 Lakehouse。 您需要承載的 ABFS 路徑,才能在 Visual Studio Code 中的 Livy API 批次作業中參考,以及 Select SQL 語句中的 Lakehouse 數據表名稱。

    顯示 Python 承載儲存格的螢幕快照。

  3. 將 Python 承載上傳至 Lakehouse 的檔案區段。 > 取得數據 > 上傳檔案 > 按兩下 [檔案/ 輸入] 方塊。

    顯示 Lakehouse 檔案區段中承載的螢幕快照。

  4. 檔案位於 Lakehouse 的 [檔案] 區段之後,按兩下承載檔名右邊的三個點,然後選取 [屬性]。

    顯示 Lakehouse 中檔案屬性中承載 ABFS 路徑的螢幕快照。

  5. 將此 ABFS 路徑複製到步驟 1 中的 Notebook 單元格。

建立 Livy API Spark 批次工作階段

  1. 在 Visual Studio Code 中建立 .ipynb 筆記本,並插入下列程式代碼。

    
    from msal import PublicClientApplication
    import requests
    import time
    
    tenant_id = "<Entra_TenantID>"
    client_id = "<Entra_ClientID>"
    
    workspace_id = "<Fabric_WorkspaceID>"
    lakehouse_id = "<Fabric_LakehouseID>"
    
    app = PublicClientApplication(
       client_id,
       authority="https://login.microsoftonline.com/43a26159-4e8e-442a-9f9c-cb7a13481d48"
    )
    
    result = None
    
     # If no cached tokens or user interaction needed, acquire tokens interactively
     if not result:
         result = app.acquire_token_interactive(scopes=["https://api.fabric.microsoft.com/Lakehouse.Execute.All", "https://api.fabric.microsoft.com/Lakehouse.Read.All", "https://api.fabric.microsoft.com/Item.ReadWrite.All", 
                                                    "https://api.fabric.microsoft.com/Workspace.ReadWrite.All", "https://api.fabric.microsoft.com/Code.AccessStorage.All", "https://api.fabric.microsoft.com/Code.AccessAzureKeyvault.All", 
                                                    "https://api.fabric.microsoft.com/Code.AccessAzureDataExplorer.All", "https://api.fabric.microsoft.com/Code.AccessAzureDataLake.All", "https://api.fabric.microsoft.com/Code.AccessFabric.All"])
    
    # Print the access token (you can use it to call APIs)
    if "access_token" in result:
       print(f"Access token: {result['access_token']}")
    else:
       print("Authentication failed or no access token obtained.")
    
    if "access_token" in result:
       access_token = result['access_token']
       api_base_url_mist='https://api.fabric.microsoft.com/v1'
       livy_base_url = api_base_url_mist + "/workspaces/"+workspace_id+"/lakehouses/"+lakehouse_id +"/livyApi/versions/2023-12-01/batches"
       headers = {"Authorization": "Bearer " + access_token}
    
  2. 執行筆記本數據格,瀏覽器中應該會出現彈出視窗,讓您選擇要用來登入的身分識別。

    顯示登入畫面至 Microsoft Entra 應用程式的螢幕快照。

  3. 選擇要登入的身分識別之後,系統也會要求您核准 Microsoft Entra 應用程式註冊 API 許可權。

    顯示Microsoft Entra 應用程式 API 許可權的螢幕快照。

  4. 完成驗證之後關閉瀏覽器視窗。

    顯示驗證完成的螢幕快照。

  5. 在 Visual Studio Code 中,您應該會看到傳回Microsoft Entra 令牌。

    此螢幕快照顯示執行儲存格和登入之後傳回Microsoft Entra 令牌。

  6. 新增另一個筆記本數據格,並插入此程序代碼。

    # call get batch API
    
    get_livy_get_batch = livy_base_url
    get_batch_response = requests.get(get_livy_get_batch, headers=headers)
    if get_batch_response.status_code == 200:
       print("API call successful")
       print(get_batch_response.json())
    else:
       print(f"API call failed with status code: {get_batch_response.status_code}")
       print(get_batch_response.text)
    
  7. 執行筆記本數據格,您應該會看到兩行列印為 Livy 批次作業建立。

    顯示批次會話建立結果的螢幕快照。

使用 Livy API 批次會話提交spark.sql語句

  1. 新增另一個筆記本數據格,並插入此程序代碼。

    # submit payload to existing batch session
    
    print('Submit a spark job via the livy batch API to ') 
    
    newlakehouseName = "YourNewLakehouseName"
    create_lakehouse = api_base_url_mist + "/workspaces/" + workspace_id + "/items"
    create_lakehouse_payload = {
       "displayName": newlakehouseName,
       "type": 'Lakehouse'
    }
    
    create_lakehouse_response = requests.post(create_lakehouse, headers=headers, json=create_lakehouse_payload)
    print(create_lakehouse_response.json())
    
    payload_data = {
       "name":"livybatchdemo_with"+ newlakehouseName,
       "file":"abfss://YourABFSPathToYourPayload.py", 
       "conf": {
         "spark.targetLakehouse": "Fabric_LakehouseID"
       }
    }
    get_batch_response = requests.post(get_livy_get_batch, headers=headers, json=payload_data)
    
    print("The Livy batch job submitted successful")
    print(get_batch_response.json())
    
  2. 執行筆記本數據格,您應該會看到數行列印為 Livy Batch 作業建立並執行。

    顯示成功提交 Livy Batch 作業之後 Visual Studio Code 中結果的螢幕快照。

  3. 流覽回您的 Lakehouse 以查看變更。

在監視中樞檢視您的作業

您可以選取左側導覽連結中的 [監視],來存取監視中樞以檢視各種 Apache Spark 活動。

  1. 當批次作業完成狀態時,您可以流覽至 [監視] 來檢視會話狀態。

    顯示監視中樞中先前 Livy API 提交的螢幕快照。

  2. 選取並開啟最新的活動名稱。

    顯示監視中樞內最新 Livy API 活動的螢幕快照。

  3. 在此 Livy API 工作階段案例中,您可以看到先前的批次提交、執行詳細數據、Spark 版本和設定。 請注意右上方的已停止狀態。

    顯示監視中樞內最新 Livy API 活動詳細數據的螢幕快照。

若要回顧整個程式,您需要遠端用戶端,例如 Visual Studio Code、Microsoft Entra 應用程式令牌、Livy API 端點 URL、對 Lakehouse 的 Spark 驗證、Lakehouse 中的 Spark 承載,以及最後一個批次 Livy API 會話。