選取結構化串流的輸出模式
本文章討論如何選取可設定狀態的串流的輸出模式。 只有包含彙總的可設定狀態的串流需要輸出模式組態。
聯結僅支援附加輸出模式,而輸出模式不會影響重複資料刪除。 任意可設定狀態的運算符 mapGroupsWithState
與 flatMapGroupsWithState
使用自己的自訂邏輯發出記錄,因此串流的輸出模式不會影響其行為。
針對無狀態串流,所有輸出模式的行為都相同。
若要正確設定輸出模式,您必須瞭解可設定狀態的串流、浮水印和觸發程式。 請參閱以下文章:
什麼是輸出模式?
結構化串流查詢的輸出模式會決定查詢運算符在每個觸發程式期間發出的記錄。 可以發出的三種記錄類型如下:
- 未來處理不會變更的記錄。
- 自上次觸發程式之後變更的記錄。
- 狀態資料表中的所有記錄。
瞭解哪些類型的記錄應發出對於可設定狀態的運算符非常重要,因為可設定狀態的運算符產生的特定行可能會隨著觸發的不同而改變。 例如,當串流彙總運算符針對特定視窗接收更多列時,該視窗的彙總值可能會在觸發程式之間變更。
對於無狀態運算符,記錄類型之間的差異不會影響運算符的行為。 無狀態運算符在觸發程式期間發出的記錄一律是在該觸發程式期間處理的來源記錄。
可用的輸出模式
有三種輸出模式可告訴運算符在特定觸發程式期間發出哪些記錄:
輸出模式 | 描述 |
---|---|
附加模式 (預設) | 根據預設,串流查詢會以附加模式執行。 在此模式中,運算符只會發出未來觸發程式不會變更的數據列。 可設定狀態的運算符會使用浮水印來判斷何時發生此情況。 |
更新模式 | 在更新模式中,運算符會發出觸發程式期間變更的所有數據列,包括發出的記錄可能會在後續觸發程式中變更。 |
完整模式 | 完整模式僅適用於串流彙總。 在完整模式中,運算符所產生的所有數據列都會發出下游。 |
生產考量
對於許多可設定狀態的串流運算,您必須在附加和更新模式之間選擇。 下列各章節概述可能會通知您決策的考慮。
注意
完整模式有一些應用程式,但隨著資料調整而執行效能不佳。 Databricks 建議使用具體化檢視以獲得與完整模式相關的語意保證,並針對許多可設定狀態的運算進行累加處理。 請參閱使用 Databricks SQL 中的具體化檢視。
應用程式語意
應用程式語意描述下游應用程式如何使用串流資料。
如果下游服務需要針對每個下游寫入採取單一執行,請在大部分情況下使用附加模式。 例如,如果您的下游通知服務針對寫入接收器的每個新記錄傳送通知,則附加模式會確保每個記錄只會寫入一次。 每次狀態資訊變更時,更新模式都會寫入記錄,這會導致許多更新。
如果下游服務需要新的結果,更新模式可確保您的接收器盡可能保持最新狀態。 範例包括機器學習模型,可即時讀取功能,或追蹤即時彙總的分析儀表板。
運算子和接收器相容性
結構化串流不支援 Apache Spark 中所有可用的運算,而且所有輸出模式不支援某些串流運算。 如需運算符限制的詳細資訊,請參閱 OSS 串流檔。
並非所有接收器都支援所有輸出模式。 支援所有 Unity 目錄受控資料表的 Delta Lake 和 Kafka 都支援所有輸出模式。 如需其他接收器相容性的詳細資訊,請參閱 <OSS 串流檔>。
延遲和成本
輸出模式會影響寫入記錄之前必須經過多少時間,而寫入的資料頻率和數量可能會影響與串流管線關聯的成本。
附加模式只會強制可設定狀態的運算符在完成可設定狀態的結果之後發出結果,這至少與您的浮水印延遲一樣長。 附加輸出模式中的浮水印延遲 1 hour
表示您的記錄在發出下游之前至少有 1 小時的延遲。
更新模式會導致每個觸發程式每個彙總值一次寫入。 如果您的接收器按每筆記錄寫入次數收費,那麼在浮水印延遲傳遞之前記錄多次更新,可能會相當昂貴。
設定範例
下列程式碼範例示範如何設定將更新串流處理至 Unity 目錄資料表的輸出模式:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
請參閱 PySpark DataStreamWriter.outputMode 或 Scala DataStreamWriter.outputMode 的OSS 檔。
可設定狀態的串流和輸出模式範例
下列範例旨在協助您瞭解輸出模式如何與浮水印互動以進行可設定狀態的串流。
請考慮一個串流彙總操作,該操作計算每小時商店產生的總收入,並設置 15 分鐘的浮水印延遲。 第一個微批次會處理下列記錄:
- $15 下午 2:40
- $10 下午 2:30
- $30 下午 3:10
此時引擎的浮水印是下午 2:55,因為它從看到的最大時間(下午 3:10)減去 15 分鐘(延遲)。 串流彙總運算符的狀態有如下:
[2pm, 3pm]
: $25[3pm, 4pm]
: $30
下表概述每個輸出模式中會發生什麼情況:
輸出模式 | 結果和原因 |
---|---|
附加 | 串流彙總運算符不會發出任何下游。 這是因為這兩個視窗可能會隨著後續觸發程式出現新值而變更:浮水印下午 2:55 表示下午 2:55 之後的記錄可能仍然到達,而且這些記錄可能會落入 [2pm, 3pm] 視窗或 [3pm, 4pm] 視窗。 |
更新 | 運算符會發出這兩筆記錄,因為兩筆記錄都接收到更新。 |
完成 | 運算符會發出所有記錄。 |
現在,假設串流會再接收一筆記錄:
- $20 下午 3:20
浮水印會更新為下午 3:05,因為引擎從下午 3:20 減去 15 分鐘。 此時,串流彙總運算符的狀態有如下:
[2pm, 3pm]
: $25[3pm, 4pm]
: $50
下表概述每個輸出模式中會發生什麼情況:
輸出模式 | 結果和原因 |
---|---|
附加 | 串流彙總運算符會觀察下午 3:05 的浮水印大於 [2pm, 3pm] 視窗的結束。 根據浮水印的定義,該視窗無法再變更,因此會發出 [2pm, 3pm] 視窗。 |
更新 | 串流彙總運算符會發出 [3pm, 4pm] 視窗,因為狀態值已從 $30 變更為 $50。 |
完成 | 運算符會發出所有記錄。 |
下列摘要說明可設定狀態的運算符在每個附加模式中的行為方式:
- 在附加模式中,在浮水印延遲之後寫入記錄一次。
- 在更新模式中,寫入自上一個觸發程式之後已變更的記錄。
- 在完整模式中,寫入可設定狀態的運算符所產生的所有記錄。