結構化串流的生產考量
本文章包含使用 Azure Databricks 上的工作排程結構化串流工作負載的建議。
Databricks 建議一律執行下列動作:
- 從筆記本中不必要的程式代碼,例如會傳回結果的 Remove、
display
和count
。 - 請勿使用所有用途計算來執行結構化串流工作負載。 一律使用工作計算將串流排程為工作。
- 使用
Continuous
模式排程工作。 - 請勿針對結構化串流工作啟用計算的自動調整。
某些工作負載受益於下列各項:
Azure Databricks 引進了 Delta Live Tables,以減少管理結構化串流工作負載生產基礎結構的複雜性。 Databricks 建議針對新的結構化串流管線使用 Delta Live Tables。 請參閱 什麼是 Delta Live Tables?。
注意
在縮小結構化串流工作負載的叢集大小時,計算自動調整有其限制。 Databricks 建議對串流工作負載使用配備增強自動調整功能的 Delta Live Tables。 查看增強型自動調整的 Delta Live
為串流工作負載設計預期失敗的能力
Databricks 建議一律設定串流工作,以在失敗時自動重新啟動。 某些功能,包括 schema 演進,假設結構化串流工作負載已設定為自動重試。 請參閱 <設定結構化串流工作,以在失敗時重新啟動串流查詢>。
某些運算,例如 foreachBatch
提供至少一次,而不是確切一次的保證。 針對這些運算,您應該讓處理管線具有等冪。 請參閱<使用 foreachBatch 寫入任意資料接收器>。
注意
當查詢重新啟動時,會處理上一次執行期間計劃的微批次。 如果您的工作因記憶體不足錯誤而失敗,或因超大型的微批次而手動取消工作,您可能需要相應擴大計算,才能成功處理微批次。
如果您變更執行之間的組態,這些設定會套用至第一個新批次方案。 請參閱 <在結構化串流查詢變更之後的復原>。
工作何時重試?
您可以將多個工作排程為 Azure Databricks 工作的一部分。 當您使用連續觸發程式設定作業時,無法 set 工作之間的相依性。
您可以選擇使用下列其中一種方法,在單一工作中排程多個串流:
- 多個工作:定義具有使用連續觸發執行串流工作負載之多個工作的工作。
- 多個查詢:在單一工作的原始程式碼中定義多個串流查詢。
您也可以合併這些策略。 下列 table 會比較這些方法。
多項工作 | 多個查詢 | |
---|---|---|
如何共用計算? | Databricks 建議將工作的計算大小適當地部署到每個串流工作。 您可以選用跨工作共用計算。 | 所有查詢都會共用相同的計算。 您可以選用將查詢指派給 排程者集區。 |
重試如何處理? | 所有工作都必須在工作重試之前失敗。 | 如果有任何查詢失敗,工作會重試。 |
設定結構化串流工作以在失敗時重新啟動串流查詢
Databricks 建議使用連續觸發設定所有串流工作負載。 請參閱持續執行作業。
連續觸發預設會提供下列行為:
- 防止多個並行執行工作。
- 在上一次執行失敗時開始新的執行。
- 使用指數輪詢重試。
Databricks 建議在排程工作流程時一律使用工作計算,而不是所有用途的計算。 在工作失敗並重試時,會部署新的計算資源。
注意
您不需要使用 streamingQuery.awaitTermination()
或 spark.streams.awaitAnyTermination()
。 當串流查詢處於作用中狀態時,工作會自動防止執行完成。
針對多個串流查詢使用排程者集區
您可以從相同的原始程式碼執行多個串流查詢時,設定排程集區將計算容量指派給查詢。
根據預設,筆記本中開始的所有查詢都會在相同的公平排程集區中執行。 來自筆記本中所有串流查詢的觸發所產生的 Apache Spark 工作會以「先進先出」(FIFO) 順序逐一執行。 這可能會導致查詢中不必要的延遲,因為它們無法有效率地共用叢集資源。
排程者集區允許您宣告哪些結構化串流查詢共用計算資源。
下列範例將 query1
指派至專用集區,而 query2
與 query3
則共用排程者集區。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
注意
本地屬性配置必須與您啟動串流查詢的筆記本單元格 where 相同。
如需其他詳細資料,請參閱 <Apache 公平排程者文件>。