共用方式為


執行您的第一個結構化串流工作負載

本文提供在 Azure Databricks 上執行第一個結構化串流查詢所需基本概念的程式碼範例和說明。 您可以針對近即時和累加處理工作負載使用結構化串流。

結構化串流是驅動 Delta Live Tables中串流 tables 的多種技術之一。 Databricks 建議針對所有新的 ETL、擷取和結構化串流工作負載使用 Delta Live Tables。 請參閱 什麼是 Delta Live Tables?

注意

雖然 Delta Live Tables 提供稍微修改的語法來宣告串流 tables,但設定串流讀取和轉換的一般語法適用於 Azure Databricks 上的所有串流使用案例。 Delta Live Tables 也藉由管理狀態資訊、元數據和許多設定來簡化串流。

使用自動載入器從物件儲存體讀取串流資料

下列範例示範使用自動載入器載入 JSON 資料,以 cloudFiles 表示格式和選項。 [schemaLocation] 選項可啟用 [schema] 的推理和進化。 將下列程式碼貼到 Databricks 筆記本資料格中,然後執行資料格以建立名為 raw_df的串流 DataFrame:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

如同 Azure Databricks 上的其他讀取作業,設定串流讀取實際上不會載入資料。 您必須在串流開始之前觸發資料上的動作。

注意

在串流 DataFrame 上呼叫 display() 會啟動串流工作。 對於大部分結構化串流使用案例,觸發串流的動作應該會將資料寫入接收器。 請參閱結構化串流的生產考量

執行串流轉換

結構化串流支援 Azure Databricks 和 Spark SQL 中可使用的大部分轉換。 您甚至可以 UDF 形式載入 MLflow 模型,並以轉換的形式進行串流預測。

下列程式碼範例會完成簡單的轉換,以使用 Spark SQL 函式擴充內嵌的 JSON 資料:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

產生的 transformed_df 包含在每個記錄到達資料來源時,載入及轉換每個記錄的查詢指示。

注意

結構化串流會將資料來源視為未繫結或無限資料集。 因此,結構化串流工作負載中不支援某些轉換,因為其需要排序無限數量的項目。

大部分彙總和許多聯結都需要使用浮水印、視窗和輸出模式來管理狀態資訊。 請參閱套用浮水印來控制資料處理閾值

執行累加批次寫入 Delta Lake

下列範例會使用指定的檔案路徑和檢查點來寫入 Delta Lake。

重要

請務必針對您所設定的每個串流寫入器指定唯一的檢查點位置。 檢查點會針對您的串流提供唯一的身分識別,並追蹤與串流查詢相關聯的所有已處理記錄和狀態資訊。

觸發程序的 availableNow 設定會指示結構化串流處理來源資料集先前未處理的所有記錄,然後關閉,因此您可以安全地執行下列程式碼,而不必擔心串流執行:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

在此範例中,沒有任何新記錄送達我們的資料來源,因此重複執行此程式碼並不會內嵌新的記錄。

警告

結構化串流執行可防止自動終止關閉計算資源。 若要避免非預期的成本,請務必終止串流查詢。

從 Delta Lake 讀取資料、轉換並寫入 Delta Lake

Delta Lake 提供對使用結構化串流做為來源和接收器的廣泛支援。 請參閱 Delta table 串流讀取與寫入

下列範例示範從 Delta table累加載入所有新記錄的範例語法,join 另一個 Delta table的快照集,並將其寫入 Delta table:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

您必須設定適當的許可權,才能讀取來源 tables,並寫入目標 tables 和指定的檢查點位置。 使用來自資料來源和匯集的相關 values 填入所有以角括弧(<>)表示的 parameters。

注意

Delta Live Tables 提供宣告式語法,完整地用於建立 Delta Lake 管線,並自動處理觸發器和檢查點等屬性。 請參閱 什麼是 Delta Live Tables?

從 Kafka 讀取資料、轉換並寫入 Kafka

Apache Kafka 和其他傳訊匯流排提供一些可供大型資料集使用的最低延遲。 您可以使用 Azure Databricks 將轉換套用至從 Kafka 擷取的資料,然後將資料寫回 Kafka。

注意

將資料寫入雲端物件儲存體會增加額外的延遲負荷。 如果您想要將傳訊匯流排的資料儲存在 Delta Lake 中,但針對串流工作負載需要最低延遲,Databricks 建議設定個別的串流工作來將資料內嵌至 Lakehouse,並針對下游傳訊匯流排接收器套用近即時的轉換。

下列程式代碼範例示範簡單的模式,藉由將 Kafka 中的數據與 Delta table 中的數據聯結,然後回寫至 Kafka,以豐富 Kafka 的數據:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

您必須設定適當的權限,才能存取 Kafka 服務。 請使用數據來源和資料匯入的相關 values,填入所有以角括弧(<>)表示的 parameters。 請參閱《使用 Apache Kafka 和 Azure Databricks 進行串流處理》。