共用方式為


套用浮水印來控制資料處理閾值

本文介紹浮浮水印的基本概念,並提供在一般具狀態串流作業中使用浮浮浮水印的建議。 您必須將浮水印套用至具狀態串流作業,以避免無限地擴充保留在狀態中的數據量,這可能會造成記憶體問題,並在長時間執行的串流作業期間增加處理延遲。

什麼是浮浮浮浮水印?

結構化串流會使用浮浮浮浮水印來控制要持續處理指定狀態實體更新多久的臨界值。 狀態實體的常見範例包括:

  • 時間範圍中的匯總。
  • 兩個數據流之間聯結的唯一索引鍵。

當您宣告浮水印時,您會在串流數據框架上指定時間戳欄位和浮水印閾值。 當新數據送達時,狀態管理員會追蹤指定欄位中最新的時間戳,並在延遲閾值內處理所有記錄。

下列範例會將 10 分鐘的水位線閾值套用至視窗計數:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

在此範例中:

  • 數據 event_time 行用來定義 10 分鐘的浮水印和 5 分鐘的輪轉視窗。
  • 針對每個非重疊的 5 分鐘視窗,會收集每個 id 觀察到的計數。
  • 狀態資訊會針對每個計數進行維護,直到時間範圍結束 10 分鐘,比觀察到 event_time的最新 。

重要

浮浮浮水印臨界值保證會根據所定義查詢的語意來處理抵達指定閾值內的記錄。 抵達指定臨界值以外的延遲記錄仍可使用查詢計量進行處理,但這並不保證。

浮浮水印如何影響處理時間和輸送量?

浮水印會與輸出模式互動,以控制資料寫入接收器的時機。 由於浮水印會減少要處理的狀態資訊總量,因此有效使用浮水印對於有效率的具狀態串流輸送量至關重要。

注意

並非所有的輸出模式都支援所有具狀態作業。

視窗匯總的浮浮浮浮水印和輸出模式

下表詳細說明使用已定義浮水印之時間戳匯總的查詢處理:

輸出模式 行為
附加 一旦水位線閾值通過,數據列就會寫入目標數據表。 所有寫入都會根據延遲閾值延遲。 一旦超過閾值,就會卸除舊的匯總狀態。
更新 數據列會在計算結果時寫入目標數據表,而且可在新數據送達時更新和覆寫。 一旦超過閾值,就會卸除舊的匯總狀態。
完成 匯總狀態不會卸除。 系統會使用每個觸發程式重寫目標數據表。

數據流聯結的浮浮浮浮水印和輸出

多個數據流之間的聯結僅支援附加模式,且相符的記錄會在探索到的每個批次中寫入。 針對內部聯結,Databricks 建議在每個串流數據源上設定水位線閾值。 這可讓舊記錄捨棄狀態資訊。 如果沒有浮水印,結構化串流會嘗試從聯結的兩端與每個觸發程式聯結每個索引鍵。

結構化串流具有特殊的語意,可支援外部聯結。 外部聯結必須進行浮浮水印處理,因為它表示在進行不相符之後,何時必須以 Null 值寫入索引鍵。 請注意,雖然外部聯結對於記錄在數據處理期間永遠不會相符的記錄很有用,因為聯結只會寫入數據表做為附加作業,但直到延遲閾值過後才會記錄此遺漏的數據。

在結構化串流中使用多個水位線原則控制晚期數據閾值

使用多個結構化串流輸入時,您可以設定多個浮水印來控制延遲抵達數據的容錯閾值。 設定浮浮浮浮水印可讓您控制狀態資訊並影響延遲。

串流查詢可以有多個聯集或聯結在一起的輸入數據流。 每個輸入數據流都可以有不同的延遲數據臨界值,這些閾值必須容許進行具狀態作業。 在每個輸入數據流上使用 來指定這些臨界 withWatermarks("eventTime", delay) 值。 以下是具有 數據流聯結的範例查詢。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

執行查詢時,結構化串流會個別追蹤每個輸入數據流中所看到的最大事件時間、根據對應的延遲計算浮水印,並選擇單一全域浮水印,並將它們用於具狀態作業。 根據預設,如果其中一個數據流落後於其他數據流,則選擇最小值為全域水位線,因為它可確保如果其中一個數據流落後於其他數據流,則不會意外捨棄任何數據(例如,其中一個數據流會因為上游失敗而停止接收數據)。 換句話說,全域水位線會以最慢的數據流速度安全地移動,並據此延遲查詢輸出。

如果您想要取得更快的結果,您可以設定多個浮水印原則,藉由將 SQL spark.sql.streaming.multipleWatermarkPolicy max 組態設定為 ,以選擇最大值作為全域水位線(預設值為 min)。 這可讓全球水位線以最快的數據流速度移動。 不過,此設定會從最慢的數據流卸除數據。 因此,Databricks 建議您明智地使用此設定。

在浮浮浮水印內卸除重複專案

在 Databricks Runtime 13.3 LTS 和更新版本中,您可以使用唯一標識符在水位線閾值內重複數據刪除記錄。

結構化串流提供完全一次的處理保證,但不會自動從數據源刪除記錄。 您可以使用 dropDuplicatesWithinWatermark 在任何指定的欄位上重複資料刪除記錄,讓您從數據流中移除重複專案,即使某些欄位不同(例如事件時間或抵達時間)。

保證會卸除抵達指定浮水印內的重複記錄。 此保證只有一個方向嚴格,而且可能也會捨棄到達指定閾值以外的重複記錄。 您必須設定浮水印的延遲閾值,超過重複事件之間的時間戳差異上限,以移除所有重複專案。

您必須指定浮水印以使用 dropDuplicatesWithinWatermark 方法,如下列範例所示:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])