共用方式為


在工作流程中執行 Delta Live Tables 管線

您可以使用 Databricks 工作、Apache Airflow 或 Azure Data Factory,在資料處理工作流程中執行 Delta Live Tables 管線。

工作

您可以在 Databricks 工作中協調多個工作,以實作資料處理工作流程。 若要在工作中包含 Delta Live Tables 管線,請在建立工作時使用管線工作。 請參閱Delta Live Tables 管線工作的作業

Apache Airflow

Apache Airflow 是管理及排程資料工作流程的開放原始碼解決方案。 Airflow 以有向非循環圖 (DAG) 的形式表示作業的工作流程。 您可以在 Python 檔案中定義工作流程,而 Airflow 會管理排程和執行。 如需搭配 Azure Databricks 安裝和使用 Airflow 的資訊,請參閱使用 Apache Airflow 協調 Azure Databricks 工作

若要在 Airflow 工作流程中執行 Delta Live Tables 管線,請使用 DatabricksSubmitRunOperator

需求

以下是使用 Delta Live Tables 的 Airflow 支援的必要項目:

範例

下列範例會建立 Airflow DAG,以識別碼 8279d543-063c-4d63-9926-dae38e35ce8b 觸發 Delta Live Tables 管線的更新:

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_ID 取代為 Airflow 連線到您工作區的識別碼。

將此範例儲存在目錄中,airflow/dags 並使用 Airflow UI 來 檢視和觸發 DAG。 使用 Delta Live Tables UI 來檢視管線更新的詳細資料。

Azure Data Factory

Azure Data Factory 是雲端式 ETL 服務,可讓您協調資料整合與轉換工作流程。 Azure Data Factory 直接支援在工作流程中執行 Azure Databricks 工作,包括 筆記本、JAR 工作和 Python 指令碼。 您也可以從 Azure Data Factory 網路活動呼叫 Delta Live Tables API,在工作流程中包含管線。 例如,若要從 Azure Data Factory 觸發管線更新:

  1. 建立資料處理站或開啟現有的資料處理站。

  2. 建立完成時,開啟資料處理站的頁面,然後按下 [開啟 Azure Data Factory Studio] 圖格。 Azure Data Factory 使用者介面隨即出現。

  3. 從 Azure Data Factory Studio 使用者介面的 [新增] 下拉功能表中選取 [管線],以建立新的 Azure Data Factory 管線。

  4. 在 [活動] 工具箱中,展開 [一般],然後將 [網頁] 活動拖曳到管線創作區。 按下 [設定] 索引標籤,然後輸入下列值:

    注意

    作為安全性最佳做法,當您使用自動化工具、系統、指令碼和應用程式進行驗證時,Databricks 建議您使用屬於服務主體的個人存取權杖,而不是工作區使用者。 若要建立服務主體的權杖,請參閱管理服務主體的權杖

    • URLhttps://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

      取代 <get-workspace-instance>

      <pipeline-id> 取代為管線識別碼。

    • 方法:從下拉式功能表中選取 POST

    • 標頭:按下 [+ 新增]。 在 [使用者名稱] 文字方塊中,輸入 Authorization。 在[值] 文字方塊中,輸入 Bearer <personal-access-token>

      以 Azure Databricks 個人存取權杖取代 <personal-access-token>

    • 本文:若要傳遞其他要求參數,請輸入包含參數的 JSON 文件。 例如,若要啟動更新並重新處理管線的所有資料:{"full_refresh": "true"}。 如果沒有其他要求參數,請輸入空白大括弧 ({})。

若要測試網路活動,請按下 Data Factory UI 中管線工具列上的 [偵錯]。 執行的輸出和狀態,包括錯誤,會顯示在 Azure Data Factory 管線的 [輸出] 索引標籤中。 使用 Delta Live Tables UI 來檢視管線更新的詳細資料。

提示

常見的工作流程需求是在上一個工作完成之後啟動工作。 由於 Delta Live Tables updates 要求是非同步的—要求會在啟動更新之後傳回,但在更新完成之前—Azure Data Factory 管線中的工作必須等候更新完成。 等候更新完成的選項是在觸發 Delta Live Tables 更新的網路活動之後新增 Until 活動。 在 Until 活動中:

  1. 新增等候活動以等候設定的秒數,以完成更新。
  2. 在等候活動之後新增網路活動,以使用 Delta Live Tables 取得更新詳細資料要求來取得更新的狀態。 回覆中的 state 欄位會傳回更新的目前狀態,包括是否已完成。
  3. 使用 state 欄位的值來設定 Until 活動的終止條件。 您也可以使用 [設定變數] 活動,根據 state 值新增管線變數,並將此變數用於終止條件。