Checkpoint dello stato asincrono per le query con stato
Nota
È disponibile in Databricks Runtime 10.4 LTS e versioni successive.
Il checkpoint dello stato asincrono garantisce esattamente una volta per le query di streaming, ma può ridurre la latenza complessiva per alcuni carichi di lavoro con stato structured streaming con problemi di aggiornamento dello stato. Questa operazione viene eseguita iniziando a elaborare il micro batch successivo non appena il calcolo del micro batch precedente è stato completato senza attendere il completamento del checkpoint dello stato. L'table seguente confronta i compromessi per i checkpoint sincroni e asincroni:
Caratteristica | Checkpoint sincrono | Checkpoint asincrono |
---|---|---|
Latenza | Latenza più elevata per ogni micro batch. | Riduzione della latenza perché i micro batch possono sovrapporsi. |
Riavviare | Recupero rapido poiché solo l'ultimo batch deve essere eseguito di nuovo. | Maggiore ritardo nel riavvio poiché più di un micro batch potrebbe dover essere eseguito di nuovo. |
Di seguito sono riportate le caratteristiche del processo di streaming che possono trarre vantaggio dal checkpoint dello stato asincrono:
- Il processo ha una o più operazioni con stato (ad esempio, aggregazione,
flatMapGroupsWithState
,mapGroupsWithState
, stream-stream join) - La latenza del checkpoint di stato è uno dei principali collaboratori alla latenza di esecuzione batch complessiva. Queste informazioni sono disponibili negli eventi StreamingQueryProgress. Questi eventi sono disponibili anche nei log4j nel driver Spark. Di seguito è riportato un esempio di stato delle query di streaming e come trovare l'impatto del checkpoint di stato sulla latenza di esecuzione batch complessiva.
-
{ "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19", "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe", "...", "batchId" : 0, "durationMs" : { "...", "triggerExecution" : 547730, "..." }, "stateOperators" : [ { "...", "commitTimeMs" : 3186626, "numShufflePartitions" : 64, "..." }] }
Analisi della latenza del checkpoint di stato dell'evento di avanzamento della query
- La durata del batch (
durationMs.triggerDuration
) è di circa 547 sec. - La latenza di commit dell'archivio stati (
stateOperations[0].commitTimeMs
) è di circa 3.186 sec. La latenza di commit viene aggregata tra le task contenenti un archivio stati. In questo caso sono presenti 64 task di questo tipo (stateOperators[0].numShufflePartitions
). - Ogni task contenente un operatore di stato ha richiesto una media di 50 sec (3.186/64) per il checkpoint. Si tratta di una latenza aggiuntiva che contribuisce alla durata del batch. Supponendo che tutte le 64 task vengano eseguite contemporaneamente, il passaggio del checkpoint ha contribuito circa il 9% (50 sec / 547 sec) della durata del batch. La percentuale aumenta ancora di più quando le task simultanee massime sono inferiori a 64.
- La durata del batch (
-
Abilitare il checkpoint dello stato asincrono
È necessario usare l'archivio stati basato su RocksDB per il checkpoint dello stato asincrono. Set le configurazioni seguenti:
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
Limitazioni e requisiti per il checkpoint asincrono
Nota
La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare delta live Tables con scalabilità automatica avanzata per i carichi di lavoro di streaming. Vedere Optimize l'utilizzo del cluster delle pipeline di Tables Delta Live con scalabilità automatica avanzata.
- Qualsiasi errore in un checkpoint asincrono in uno o più archivi fa fallire la query. In modalità di checkpoint sincrono, il checkpoint viene eseguito come parte del task e Spark ritenta il task più volte prima di fallire la query. Questo meccanismo non è presente con il checkpoint dello stato asincrono. Databricks consiglia di usare processi continui per i tentativi automatici in caso di errore del processo. Vedere Eseguire processi in modo continuo.
- Il checkpoint asincrono funziona meglio quando le posizioni dell'archivio stati non vengono modificate tra le esecuzioni di micro batch. Il ridimensionamento del cluster, in combinazione con il checkpoint dello stato asincrono, potrebbe non funzionare correttamente perché l'istanza degli archivi di stato potrebbe get ridistribuire quando i nodi vengono aggiunti o eliminati come parte dell'evento di ridimensionamento del cluster.
- Il checkpoint dello stato asincrono è supportato solo nell'implementazione del provider dell'archivio stati RocksDB. L'implementazione predefinita dell'archivio stati in memoria non lo supporta.