讀取結構化串流狀態資訊
重要
這項功能處於公開預覽狀態。
在 Databricks Runtime 14.3 LTS 和更新版本中,您可以使用 DataFrame 作業或 SQL table值函式來查詢結構化串流狀態數據和元數據。 您可使用這些函式來觀察結構化串流具狀態的查詢的狀態資訊,這對於監視和偵錯很有用。
您必須具有串流查詢檢查點路徑的讀取權限,才能查詢狀態資料或中繼資料。 本文所述的函式提供狀態資料和中繼資料的唯讀存取權。 您僅能使用批次讀取語意來查詢狀態資訊。
注意
您無法查詢 Delta Live Tables 管線、資料流 tables或具現化 views的狀態資訊。
讀取結構化串流狀態存放區
您可讀取任何支援的 Databricks Runtime 中所執行結構化串流查詢的狀態存放區資訊。 使用下列語法:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
支援下列設定選擇性的組態:
選項 | 類型 | 預設值 | 說明 |
---|---|---|---|
batchId |
Long | 最新批次識別碼 | 表示要從中讀取的目標批次。 指定此選項來查詢先前查詢狀態的狀態資訊。 必須認可批次,但尚未清理。 |
operatorId |
Long | 0 | 表示要從中讀取的目標運算子。 當查詢使用多個具狀態運算子時,會使用此選項。 |
storeName |
String | “DEFAULT” | 表示要讀取的目標狀態存放區名稱。 當具狀態的運算子使用多個狀態存放區執行個體時,會使用此選項。 流蒸汽 join必須指定 storeName 或 joinSide ,但不能同時指定兩者。 |
joinSide |
字串 (「left」或「right」) | 表示要從中讀取的目標端。 當使用者想要從數據流 join讀取狀態時,會使用此選項。 |
傳回的資料具有以下 schema:
Column | 類型 | 說明 |
---|---|---|
key |
結構 (衍生自狀態索引鍵的進一步類型) | 狀態檢查點中具狀態運算子記錄的索引鍵。 |
value |
結構 (衍生自狀態值的進一步類型) | 狀態檢查點中具狀態運算子記錄的值。 |
partition_id |
整數 | 包含具狀態運算符記錄的狀態檢查點 partition。 |
讀取結構化串流狀態中繼資料
重要
您必須在 Databricks Runtime 14.2 或更新版本上執行串流查詢,才能記錄狀態中繼資料。 狀態中繼資料檔案不會中斷回溯相容性。 如果您選擇在 Databricks Runtime 14.1 或更新版本上執行串流查詢,則會略過現有的狀態中繼資料檔案,也不會寫入任何新的狀態中繼資料檔案。
您可以讀取 Databricks Runtime 14.2 或更新版本上執行的結構化串流查詢狀態中繼資料資訊。 使用下列語法:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
傳回的數據具有下列 schema:
Column | 類型 | 說明 |
---|---|---|
operatorId |
整數 | 具狀態串流運算子的整數識別碼。 |
operatorName |
整數 | 具狀態串流運算子的名稱。 |
stateStoreName |
String | 運算子狀態存放區的名稱。 |
numPartitions |
整數 | 狀態存放區的分割區數目。 |
minBatchId |
Long | 可供查詢狀態的最小批次識別碼。 |
maxBatchId |
Long | 可供查詢狀態的最大批次識別碼。 |
注意
由 minBatchId
和 maxBatchId
提供的批次標識碼 values 反映檢查點寫入時的狀態。 舊批次會隨著微批次執行自動清理,因此此處提供的值不保證仍可供使用。