Configurare l'archivio di stato RocksDB su Azure Databricks
È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente in SparkSession prima di avviare la query di streaming.
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
È possibile abilitare RocksDB nelle pipeline di tabelle live Delta. Vedere Ottimizzare la configurazione della pipeline per l'elaborazione con stato.
Abilitare il checkpoint del log delle modifiche
In Databricks Runtime 13.3 LTS e versioni successive è possibile abilitare il checkpoint del log delle modifiche per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro Structured Streaming. Databricks raccomanda di abilitare il checkpoint del log delle modifiche per tutte le query con stato di Structured Streaming.
In genere, gli snapshot dell'archivio stati di RocksDB e caricano i file di dati durante il checkpoint. Per evitare questo costo, il checkpoint del log delle modifiche scrive solo i record che sono stati modificati dall'ultimo checkpoint nell'archiviazione durevole."
Il checkpoint del log delle modifiche è disabilitato per impostazione predefinita. È possibile abilitare il checkpoint del log delle modifiche nel livello SparkSession usando la sintassi seguente:
spark.conf.set(
"spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")
È possibile abilitare il checkpoint del log delle modifiche in un flusso esistente e mantenere le informazioni sullo stato archiviate nel checkpoint.
Importante
Le query che hanno abilitato il checkpoint del log delle modifiche possono essere eseguite solo in Databricks Runtime 13.3 LTS e versioni successive. È possibile disabilitare il checkpoint del log delle modifiche per ripristinare il comportamento di checkpoint legacy, ma è necessario continuare a eseguire queste query in Databricks Runtime 13.3 LTS o versione successiva. È necessario riavviare il processo affinché queste modifiche vengano apportate.
Metriche dell'archivio stati di RocksDB
Ogni operatore di stato raccoglie le metriche correlate alle operazioni di gestione dello stato eseguite nell'istanza di RocksDB per osservare l'archivio stati e potenzialmente contribuire al debug della lentezza del processo. Queste metriche vengono aggregate (somma) per operatore di stato nel processo in tutte le attività in cui è in esecuzione l'operatore di stato. Queste metriche fanno parte della customMetrics
mappa all'interno dei stateOperators
campi in StreamingQueryProgress
. Di seguito è riportato un esempio di StreamingQueryProgress
in formato JSON (ottenuto usando StreamingQueryProgress.json()
).
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
Le descrizioni dettagliate delle metriche sono le seguenti:
Nome metrica | Descrizione |
---|---|
rocksdbCommitWriteBatchLatency | Tempo (in millis) richiesto per applicare le scritture di staging nella struttura in memoria (WriteBatch) a RocksDB nativo. |
rocksdbCommitFlushLatency | Tempo (in millis) impiegato per scaricare le modifiche in memoria di RocksDB nel disco locale. |
rocksdbCommitCompactLatency | Tempo (in millis) impiegato per la compattazione (facoltativo) durante il commit del checkpoint. |
rocksdbCommitPauseLatency | Tempo (in millis) impiegato per arrestare i thread di lavoro in background (per la compattazione e così via) come parte del commit del checkpoint. |
rocksdbCommitCheckpointLatency | Tempo (in millis) impiegato per creare uno snapshot di RocksDB nativo e scriverlo in una directory locale. |
rocksdbCommitFileSyncLatencyMs | Tempo (in millis) impiegato per la sincronizzazione dei file correlati allo snapshot di RocksDB nativo in un archivio esterno (posizione del checkpoint). |
rocksdbGetLatency | Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Get sottostante. |
rocksdbPutCount | Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Put sottostante. |
rocksdbGetCount | Numero di chiamate native RocksDB::Get (non include Gets writeBatch - batch in memoria usato per le scritture di staging). |
rocksdbPutCount | Numero di chiamate native RocksDB::Put (non include Puts writeBatch- in batch di memoria usato per le scritture di staging). |
rocksdbTotalBytesReadByGet | Numero di byte non compressi letti tramite chiamate native RocksDB::Get . |
rocksdbTotalBytesWrittenByPut | Numero di byte non compressi scritti tramite chiamate native RocksDB::Put . |
rocksdbReadBlockCacheHitCount | Numero di volte in cui viene usata la cache a blocchi di RocksDB nativa per evitare la lettura dei dati dal disco locale. |
rocksdbReadBlockCacheMissCount | Numero di volte in cui la cache del blocco RocksDB nativa non è stata rilevata e richiesta la lettura dei dati dal disco locale. |
rocksdbTotalBytesReadByCompaction | Numero di byte letti dal disco locale dal processo di compattazione di RocksDB nativo. |
rocksdbTotalBytesWrittenByCompaction | Numero di byte scritti nel disco locale dal processo di compattazione di RocksDB nativo. |
rocksdbTotalCompactionLatencyMs | Tempo (in millis) impiegato per le compattazioni RocksDB (sia in background che nella compattazione facoltativa avviata durante il commit). |
rocksdbWriterStallLatencyMs | Tempo (in millis) il writer si è bloccato a causa di una compattazione in background o dello scaricamento delle memtable su disco. |
rocksdbTotalBytesReadThroughIterator | Alcune delle operazioni con stato ( ad esempio l'elaborazione del timeout nelle flatMapGroupsWithState aggregazioni finestra) richiedono la lettura di interi dati nel database tramite iteratore. Dimensioni totali dei dati non compressi letti usando l'iteratore. |