共用方式為


使用 Delta Live Tables 流程逐步載入和處理資料

本文說明什麼是流程,以及如何在 Delta Live Tables 管線中使用流程,將數據從來源逐步處理至目標串流 table。 在 Delta Live Tables中,流程會以兩種方式定義:

  1. 當您建立能夠更新串流 table的查詢時,會自動定義一個流程。
  2. Delta Live Tables 也提供功能來明確定義流程,以執行更複雜的處理,例如將多個串流來源合併到單一串流 table。

本文討論當您定義查詢以 update 串流 table時所建立的隱含流程,並詳細說明語法以定義更複雜的流程。

什麼是流程?

在 Delta Live Tables中,流程 是串流查詢,會以累加方式處理源數據,以 update 目標串流 table。 您在管線中建立的大部分 Delta Live Tables 數據集都會將流程定義為查詢的一部分,而且不需要明確定義流程。 例如,您在 Delta Live Tables 中僅使用單一 DDL 命令即可建立串流 table,而不是使用個別的 table 和流程語句來建立串流 table。

注意

CREATE FLOW 範例僅供說明之用,並包含無效 Delta Live Tables 語法的關鍵詞。

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

除了查詢所定義的預設流程之外,Delta Live Tables Python 和 SQL 介面還提供 附加流程 功能。 追加流程支援需要從多個串流來源讀取數據的處理程序,將其匯入至單一串流 updatetable。 例如,當您擁有現有的串流 table 和流程時,可以使用附加流程功能,並想要新增寫入此現有串流 table的新串流來源。

使用附加流程從多個來源數據流寫入串流 table

使用 Python 介面中的 @append_flow 裝飾器,或 SQL 介面中的 CREATE FLOW 子句,從多個串流來源寫入至串流 table。 使用附加流程來處理工作,例如:

  • 新增將資料附加至現有串流 table 的串流來源,而不需要完整 refresh。 例如,您可能擁有一個 table,結合您運營的每個區域的區域數據。 隨著新區域推出,您可以將新的區域數據新增至 table,而不需要執行完整的 refresh。 請參閱範例 :從多個 Kafka 主題將資料寫入至串流 table
  • Update 串流 table,透過添加遺漏的歷史數據(回填)。 例如,您有由 Apache Kafka 主題寫入的現有串流 table。 您也有一些歷史數據儲存在 table 中,您需要這些數據準確無誤地插入一次到串流 table中,並且無法串流這些數據,因為在插入之前需要執行複雜的聚合處理。 請參閱 範例:執行一次性數據回填
  • 結合多個來源的數據並寫入單一串流 table,而不是在查詢中使用 UNION 子句。 使用累加流程處理,而不是 UNION,您可以逐步 update 目標 table,無需運行 完整的 refreshupdate。 請參閱 範例:使用附加流程處理,而不是 UNION

附加流程處理所輸出記錄的目標可以是現有的 table 或新的 table。 針對 Python 查詢,請使用 create_streaming_table() 函式來建立目標 table。

重要

  • 如果您需要使用 預期來定義資料品質條件約束,請將目標 table 的期望定義為 create_streaming_table() 函式或現有 table 定義的一部分。 您無法在 @append_flow 定義中定義預期。
  • 流程是由流程名稱識別,而此名稱用來識別串流檢查點。 使用流程名稱來識別檢查點表示下列各項:
    • 如果管線中的現有流程已重新命名,檢查點不會延續,且重新命名的流程實際上是全新的流程。
    • 您無法重複使用管線中的流程名稱,因為現有的檢查點不符合新的流程定義。

以下是 的 @append_flow語法:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

範例:將多個 Kafka topic 寫入串流 table

下面的範例會建立一個名為 kafka_target 的串流 table,並將資料從兩個 Kafka 主題寫入該串流 table。

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

若要深入瞭解 SQL 查詢中使用的 read_kafka()table值函式,請參閱 SQL 語言參考中的 read_kafka

範例:執行一次性數據回填

下列範例會執行查詢,將歷程記錄數據附加至串流 table:

注意

若要確保回填查詢在屬於排程或持續運行的管線中時仍能達成一次性回填,請在管線運行一次之後進行 remove 查詢。 若要在到達回填目錄時附加新數據,請將查詢保留原位。

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

範例:使用附加流程處理,而不是 UNION

您可以使用追加流程查詢來結合多個來源,並寫入至單一資料流 table,而不是使用具有 UNION 子句的查詢。 使用附加流程查詢,而不是 UNION 可讓您從多個來源附加至串流 table,而不需要執行 完整 refresh

下列 Python 範例包含結合多個數據源與 UNION 子句的查詢:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

下列範例會將 UNION 查詢取代為附加流程查詢:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );