Condividi tramite


Configurare l'archivio di stato RocksDB su Azure Databricks

È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente in SparkSession prima di avviare la query di streaming.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

È possibile abilitare RocksDB nelle pipeline di tabelle live Delta. Vedere Ottimizzare la configurazione della pipeline per l'elaborazione con stato.

Abilitare il checkpoint del log delle modifiche

In Databricks Runtime 13.3 LTS e versioni successive è possibile abilitare il checkpoint del log delle modifiche per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro Structured Streaming. Databricks raccomanda di abilitare il checkpoint del log delle modifiche per tutte le query con stato di Structured Streaming.

In genere, gli snapshot dell'archivio stati di RocksDB e caricano i file di dati durante il checkpoint. Per evitare questo costo, il checkpoint del log delle modifiche scrive solo i record che sono stati modificati dall'ultimo checkpoint nell'archiviazione durevole."

Il checkpoint del log delle modifiche è disabilitato per impostazione predefinita. È possibile abilitare il checkpoint del log delle modifiche nel livello SparkSession usando la sintassi seguente:

spark.conf.set(
  "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled", "true")

È possibile abilitare il checkpoint del log delle modifiche in un flusso esistente e mantenere le informazioni sullo stato archiviate nel checkpoint.

Importante

Le query che hanno abilitato il checkpoint del log delle modifiche possono essere eseguite solo in Databricks Runtime 13.3 LTS e versioni successive. È possibile disabilitare il checkpoint del log delle modifiche per ripristinare il comportamento di checkpoint legacy, ma è necessario continuare a eseguire queste query in Databricks Runtime 13.3 LTS o versione successiva. È necessario riavviare il processo affinché queste modifiche vengano apportate.

Metriche dell'archivio stati di RocksDB

Ogni operatore di stato raccoglie le metriche correlate alle operazioni di gestione dello stato eseguite nell'istanza di RocksDB per osservare l'archivio stati e potenzialmente contribuire al debug della lentezza del processo. Queste metriche vengono aggregate (somma) per operatore di stato nel processo in tutte le attività in cui è in esecuzione l'operatore di stato. Queste metriche fanno parte della customMetrics mappa all'interno dei stateOperators campi in StreamingQueryProgress. Di seguito è riportato un esempio di StreamingQueryProgress in formato JSON (ottenuto usando StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Le descrizioni dettagliate delle metriche sono le seguenti:

Nome metrica Descrizione
rocksdbCommitWriteBatchLatency Tempo (in millis) richiesto per applicare le scritture di staging nella struttura in memoria (WriteBatch) a RocksDB nativo.
rocksdbCommitFlushLatency Tempo (in millis) impiegato per scaricare le modifiche in memoria di RocksDB nel disco locale.
rocksdbCommitCompactLatency Tempo (in millis) impiegato per la compattazione (facoltativo) durante il commit del checkpoint.
rocksdbCommitPauseLatency Tempo (in millis) impiegato per arrestare i thread di lavoro in background (per la compattazione e così via) come parte del commit del checkpoint.
rocksdbCommitCheckpointLatency Tempo (in millis) impiegato per creare uno snapshot di RocksDB nativo e scriverlo in una directory locale.
rocksdbCommitFileSyncLatencyMs Tempo (in millis) impiegato per la sincronizzazione dei file correlati allo snapshot di RocksDB nativo in un archivio esterno (posizione del checkpoint).
rocksdbGetLatency Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Get sottostante.
rocksdbPutCount Tempo medio (in nano) richiesto per la chiamata nativa RocksDB::Put sottostante.
rocksdbGetCount Numero di chiamate native RocksDB::Get (non include Gets writeBatch - batch in memoria usato per le scritture di staging).
rocksdbPutCount Numero di chiamate native RocksDB::Put (non include Puts writeBatch- in batch di memoria usato per le scritture di staging).
rocksdbTotalBytesReadByGet Numero di byte non compressi letti tramite chiamate native RocksDB::Get .
rocksdbTotalBytesWrittenByPut Numero di byte non compressi scritti tramite chiamate native RocksDB::Put .
rocksdbReadBlockCacheHitCount Numero di volte in cui viene usata la cache a blocchi di RocksDB nativa per evitare la lettura dei dati dal disco locale.
rocksdbReadBlockCacheMissCount Numero di volte in cui la cache del blocco RocksDB nativa non è stata rilevata e richiesta la lettura dei dati dal disco locale.
rocksdbTotalBytesReadByCompaction Numero di byte letti dal disco locale dal processo di compattazione di RocksDB nativo.
rocksdbTotalBytesWrittenByCompaction Numero di byte scritti nel disco locale dal processo di compattazione di RocksDB nativo.
rocksdbTotalCompactionLatencyMs Tempo (in millis) impiegato per le compattazioni RocksDB (sia in background che nella compattazione facoltativa avviata durante il commit).
rocksdbWriterStallLatencyMs Tempo (in millis) il writer si è bloccato a causa di una compattazione in background o dello scaricamento delle memtable su disco.
rocksdbTotalBytesReadThroughIterator Alcune delle operazioni con stato ( ad esempio l'elaborazione del timeout nelle flatMapGroupsWithState aggregazioni finestra) richiedono la lettura di interi dati nel database tramite iteratore. Dimensioni totali dei dati non compressi letti usando l'iteratore.