共用方式為


什麼是異步進度追蹤?

重要

這項功能 公開預覽版

異步進度追蹤使得結構化串流管線能夠異步且平行於微批次的實際數據處理之中進行進度檢查,從而減少維護 offsetLogcommitLog所帶來的延遲。

異步進度追蹤

注意

異步進度追蹤不適用 Trigger.onceTrigger.availableNow 觸發器。 嘗試使用這些觸發程式啟用此功能會導致查詢失敗。

異步進度追蹤如何減少延遲?

結構化串流依賴保存和管理位移作為查詢處理的進度指標。 Offset 管理作業直接影響處理延遲,因為除非這些作業完成,否則不會發生數據處理。 異步進度追蹤可讓結構化串流管線進行檢查點進度,而不會受到這些 offset 管理作業的影響。

何時應該設定檢查點頻率?

用戶可以設定進度檢查點的頻率。 檢查點頻率的預設設定可為大多數查詢提供良好的輸送量。 設定頻率對於那些 offset 管理作業的發生速率高於可處理能力的情況非常有幫助,因為這會導致 offset 管理作業的不斷增加的積壓。 為了減少這種不斷增長的積壓工作,數據處理可能會被阻擋或變慢,基本上相當於還原處理模式,取消異步進度追蹤的好處。

注意

失敗復原時間隨著檢查點間隔時間的增加而增加。 如果失敗,管線必須重新處理先前成功檢查點之前的所有數據。 用戶可以考慮在一般處理期間降低延遲與復原時間之間的這種取捨,以防發生失敗。

哪些設定與異步進度追蹤相關聯?

選擇 價值 預設 描述
非同步進度跟蹤已啟用 (asyncProgressTrackingEnabled) 真/假 啟用或停用異步進度追蹤
asyncProgressTrackingCheckpointIntervalMs (異步進度跟蹤檢查點間隔毫秒) 毫秒 1000 我們提交位移和完成提交的時間間隔

使用者如何啟用異步進度追蹤?

使用者可以使用類似下列程式代碼的程式代碼來啟用此功能:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

關閉異步進度追蹤

啟用異步進度追蹤時,框架不會為每個批次設定檢查點進度。 若要解決此問題,請在停用異步進度追蹤之前,使用下列設定處理至少兩個微批次:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

在至少兩個微批次完成處理之後停止查詢。 現在您可以安全地停用異步進度追蹤,然後重新啟動查詢。

如果您已停用異步進度追蹤,但未完成此步驟,可能會遇到下列錯誤:

java.lang.IllegalStateException: batch x doesn't exist

在驅動程式記錄檔中,您可能會看到下列錯誤:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

依照本節中的指示停用異步進度追蹤,可讓您解決這些錯誤並修復串流工作負載。

異步進度追蹤的限制

這項功能有下列限制:

  • 只有在使用 Kafka 作為接收端時,才支援無狀態管線的異步進度追蹤。
  • 端對端的處理不保證一定完成一次,因為在失敗的情況下,批次的 offset 範圍可能會被更改,導致異步進度追蹤。 某些接收器,例如 Kafka,無法提供精確一次的保證。