結構化串流檢查點
檢查點和預寫記錄檔會一起運作,為結構化串流工作負載提供處理保證。 檢查點會追蹤識別查詢的資訊,包括狀態資訊和已處理的記錄。 刪除檢查點目錄中的檔案或變更為新的檢查點位置時,查詢的下一次執行會全新開始。
每個查詢都必須有不同的檢查點位置。 多個查詢不應該共用相同的位置。
啟用結構化串流查詢的檢查點
您必須在執行串流查詢之前指定 checkpointLocation
選項,如下列範例所示:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
注意
如果您省略此選項,某些接收器 (例如筆記本和 memory
接收器中的 display()
輸出) 會自動產生暫時檢查點位置。 這些暫存檢查點位置不會確保任何容錯或資料一致性保證,而且可能無法正確清理。 Databricks 建議一律為這些接收器指定檢查點位置。
在結構化串流查詢中的變更之後復原
從相同檢查點位置重新啟動之間的串流查詢中允許哪些變更,有一些限制。 以下是一些不允許的變更,或變更的效果未妥善定義。 針對所有變更:
- allowed 字詞表示您可以執行指定的變更,但其效果的語意是否妥善定義取決於查詢和變更。
- not allowed 字詞表示您不應該執行指定的變更,因為重新啟動的查詢可能會因為無法預測的錯誤而失敗。
sdf
表示以sparkSession.readStream
產生的串流資料框架/資料集。
結構化串流查詢中的變更類型
- 輸入來源的數量或類型變更 (也就是不同的來源):這是不允許的。
- 輸入來源參數的變更:這是否允許,以及變更的語意是否妥善定義,取決於來源和查詢。 以下是一些範例。
允許新增、刪除和修改速率限制:
spark.readStream.format("kafka").option("subscribe", "article")
打給
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
訂閱文章和檔案的變更通常不允許,因為結果無法預測:
spark.readStream.format("kafka").option("subscribe", "article")
至spark.readStream.format("kafka").option("subscribe", "newarticle")
- 觸發間隔中的變更:您可以變更增量批次與時間間隔之間的觸發程序。 請參閱<變更執行之間的觸發程序間隔>。
- 輸出接收器類型的變更:允許在幾個特定接收器組合之間進行變更。 這需要逐案驗證。 以下是一些範例。
- 允許檔案接收器至 Kafka 接收器。 Kafka 只會看到新的資料。
- 不允許 Kafka 接收器至檔案接收器。
- Kafka 接收器已變更為 foreach,反之亦然。
- 輸出接收器參數的變更:這是否允許,以及變更的語意是否妥善定義,取決於接收器和查詢。 以下是一些範例。
- 不允許變更檔案接收器的輸出目錄:
sdf.writeStream.format("parquet").option("path", "/somePath")
至sdf.writeStream.format("parquet").option("path", "/anotherPath")
- 允許對輸出主題進行變更:
sdf.writeStream.format("kafka").option("topic", "topic1")
至sdf.writeStream.format("kafka").option("topic", "topic2")
- 允許對使用者定義的 foreach 接收器進行變更 (也就是
ForeachWriter
程式碼),但變更的語意取決於程式碼。
- 不允許變更檔案接收器的輸出目錄:
- 投影/篩選/對應等作業的變更:允許部分情況。 例如:
- 允許新增/刪除篩選條件:
sdf.selectExpr("a")
至sdf.where(...).selectExpr("a").filter(...)
。 - 允許具有相同輸出結構描述的投影變更:
sdf.selectExpr("stringColumn AS json").writeStream
至sdf.select(to_json(...).as("json")).writeStream
。 - 有條件地允許具有不同輸出結構描述的投影變更:只有在輸出接收器允許結構描述從
"a"
變更為"b"
時,才允許sdf.selectExpr("a").writeStream
變更至sdf.selectExpr("b").writeStream
。
- 允許新增/刪除篩選條件:
- 具狀態作業的變更:串流查詢中的某些作業需要維護狀態資料,才能持續更新結果。 結構化串流會自動對容錯儲存體進行狀態資料檢查點檢查 (例如 DBFS、Azure Blob 儲存體),並在重新啟動後還原。 不過,這會假設狀態資料的結構描述在重新啟動時維持不變。 這表示在重新啟動之間不允許對串流查詢的具狀態作業進行任何變更 (也就是新增、刪除或結構描述修改)。 以下是在重新啟動之間不應變更其結構描述的具狀態作業清單,以確保狀態還原:
- 串流彙總:例如
sdf.groupBy("a").agg(...)
。 不允許對群組索引鍵或彙總的數量或類型進行任何變更。 - 串流重複資料刪除:例如
sdf.dropDuplicates("a")
。 不允許對群組索引鍵或彙總的數量或類型進行任何變更。 - 串流至串流聯結:例如
sdf1.join(sdf2, ...)
(亦即這兩個輸入都是使用sparkSession.readStream
產生)。 不允許變更結構描述或相等聯結資料行。 不允許聯結類型 (外部或內部) 中的變更。 聯結條件中的其他變更定義不正確。 - 任意具狀態作業:例如
sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。 不允許對使用者定義狀態的結構描述進行任何變更,而且不允許逾時類型。 允許使用者定義狀態對應函數內的任何變更,但變更的語意效果取決於使用者定義的邏輯。 如果您真的想要支援狀態結構描述變更,可以使用支援結構描述移轉的編碼/解碼結構描述,將複雜的狀態資料結構明確地編碼/解碼成位元組。 例如,如果您將狀態儲存為 Avro 編碼的位元組,就可以在查詢重新啟動之間變更 Avro-state-schema,因為這樣會還原二進位狀態。
- 串流彙總:例如