在 Azure Databricks 設定 RocksDB 狀態存放區
您可以在啟動串流查詢之前,先在SparkSession 中設定下列設定,以啟用RocksDB型狀態管理。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
您可以在 Delta Live Tables 管線上啟用 RocksDB。 請參閱 優化管線組態以進行具狀態處理。
啟用變更記錄檢查點
在 Databricks Runtime 13.3 LTS 和更新版本中,您可以啟用變更記錄檢查點,以降低結構化串流工作負載的檢查點持續時間和端對端延遲。 Databricks 建議為所有結構化串流具狀態的查詢,啟用變更記錄檢查點。
傳統上,RocksDB 狀態存放區快照集,並在檢查點期間上傳數據檔。 為了避免這項成本,changelog 檢查點只會寫入自上次檢查點之後變更為永久性記憶體的記錄。
預設會停用 Changelog 檢查點。 您可以使用下列語法,在 SparkSession 層級中啟用變更記錄檢查點:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
您可以在現有的數據流上啟用變更記錄檢查點,並維護儲存在檢查點中的狀態資訊。
重要
已啟用變更記錄檢查點的查詢只能在 Databricks Runtime 13.3 LTS 和更新版本上執行。 您可以停用變更記錄檢查點以還原為舊版檢查點行為,但您必須繼續在 Databricks Runtime 13.3 LTS 或更新版本上執行這些查詢。 您必須重新啟動作業,才能進行這些變更。
RocksDB 狀態存放區計量
每個狀態運算子都會收集與其 RocksDB 實例上執行之狀態管理作業相關的計量,以觀察狀態存放區,並可能協助偵錯作業速度緩慢。 這些計量會匯總每個狀態運算符在作業中執行狀態運算子的所有工作。 這些計量是 中欄位內地圖的stateOperators
StreamingQueryProgress
一部分customMetrics
。 以下是 JSON 格式的範例 StreamingQueryProgress
(使用 StreamingQueryProgress.json()
取得)。
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
計量的詳細描述如下:
度量名稱 | 描述 |
---|---|
rocksdbCommitWriteBatchLatency | 將記憶體內部結構中的分段寫入(WriteBatch)套用到原生 RocksDB 所花費的時間(在米利斯)。 |
rocksdbCommitFlushLatency | 將 RocksDB 記憶體內部變更排清至本機磁碟所花費的時間(以 millis 為單位)。 |
rocksdbCommitCompactLatency | 在檢查點認可期間,時間(在米利斯)進行壓縮(選擇性)。 |
rocksdbCommitPauseLatency | 在檢查點認可中停止背景工作者線程的時間(以壓縮等方式)所花費的時間。 |
rocksdbCommitCheckpointLatency | 建立原生 RocksDB 快照集並寫入本機目錄所花費的時間(在 millis 中)。 |
rocksdbCommitFileSyncLatencyMs | 將原生 RocksDB 快照集相關的檔案同步至外部記憶體 (檢查點位置) 所花費的時間(在 millis 中)。 |
rocksdbGetLatency | 每個基礎原生 RocksDB::Get 呼叫的平均時間(以 nanos 為單位)。 |
rocksdbPutCount | 每個基礎原生 RocksDB::Put 呼叫的平均時間(以 nanos 為單位)。 |
rocksdbGetCount | 原生 RocksDB::Get 呼叫數目 (不包含 Gets WriteBatch - 用於預備寫入的記憶體批次中)。 |
rocksdbPutCount | 原生 RocksDB::Put 呼叫數目 (不包括 Puts WriteBatch - 用於暫存寫入的記憶體批次中)。 |
rocksdbTotalBytesReadByGet | 透過原生 RocksDB::Get 呼叫讀取的未壓縮位元組數目。 |
rocksdbTotalBytesWrittenByPut | 透過原生 RocksDB::Put 呼叫寫入的未壓縮位元組數目。 |
rocksdbReadBlockCacheHitCount | 原生 RocksDB 區塊快取用來避免從本機磁碟讀取數據的次數。 |
rocksdbReadBlockCacheMissCount | 原生 RocksDB 區塊快取遺漏且需要從本機磁碟讀取數據的次數。 |
rocksdbTotalBytesReadByCompaction | 原生 RocksDB 壓縮程式從本機磁碟讀取的位元組數目。 |
rocksdbTotalBytesWrittenByCompaction | 原生 RocksDB 壓縮程式寫入本機磁碟的位元組數目。 |
rocksdbTotalCompactionLatencyMs | RocksDB 壓縮所花費的時間(背景和在認可期間起始的選擇性壓縮)。 |
rocksdbWriterStallLatencyMs | 時間 (在米利斯) 寫入器因背景壓縮或排清到磁碟而停滯不前。 |
rocksdbTotalBytesReadThroughIterator | 某些具狀態作業(例如視窗匯總中的逾時處理 flatMapGroupsWithState 或浮水印)需要透過反覆運算器在 DB 中讀取整個數據。 使用反覆運算器讀取未壓縮數據的總大小。 |