Che cos'è il monitoraggio del progresso asincrono?
Importante
Questa funzionalità si trova in anteprima pubblica.
Il rilevamento asincrono dello stato di avanzamento consente alle pipeline di Structured Streaming di eseguire il checkpoint in modo asincrono e in parallelo all'elaborazione effettiva dei dati all'interno di un micro-batch, riducendo la latenza associata alla gestione dei offsetLog
e commitLog
.
Nota
Il monitoraggio dei progressi asincrono non funziona con i trigger Trigger.once
o Trigger.availableNow
. Il tentativo di abilitare questa funzionalità con questi trigger genera un errore di query.
Come funziona il tracciamento del progresso asincrono per ridurre la latenza?
Structured Streaming si basa sulla persistenza e sulla gestione degli offset come indicatori di avanzamento per l'elaborazione delle query. Offset'operazione di gestione influisce direttamente sulla latenza di elaborazione, perché non può verificarsi alcuna elaborazione dati fino al completamento di queste operazioni. Il rilevamento asincrono dello stato consente alle pipeline Structured Streaming di eseguire il checkpoint dello stato di avanzamento senza essere influenzate da queste operazioni di gestione offset.
Quando è necessario configurare la frequenza dei checkpoint?
Gli utenti possono configurare la frequenza con cui si esegue il checkpoint dello stato di avanzamento. Le impostazioni predefinite per la frequenza dei checkpoint offrono una buona produttività per la maggior parte delle query. La configurazione della frequenza è utile per gli scenari in cui le operazioni di gestione offset si verificano a una velocità superiore a quella che possono essere elaborate, creando un backlog sempre maggiore di operazioni di gestione offset. Per ovviare a questo backlog in crescita, l'elaborazione dei dati viene bloccata o rallentata, ripristinando essenzialmente il comportamento di elaborazione per eliminare i vantaggi del rilevamento asincrono dello stato di avanzamento.
Nota
Il tempo di recupero degli errori aumenta con l'allungarsi dell'intervallo di checkpoint. In caso di errore, una pipeline deve rielaborare tutti i dati che precedono l'ultimo checkpoint eseguito con successo. Gli utenti possono considerare questo compromesso tra una latenza inferiore durante l'elaborazione regolare e il tempo di ripristino in caso di errore.
Quali configurazioni sono associate al rilevamento asincrono dello stato di avanzamento?
Opzione | Valore | Predefinito | Descrizione |
---|---|---|---|
asyncProgressTrackingEnabled | vero/falso | falso | abilitare o disabilitare il rilevamento dello stato di avanzamento asincrono |
intervalloMsDiCheckpointPerTracciamentoDelProgressoAsync | Millisecondi | 1000 | l'intervallo in cui si eseguono i commit degli offset e dei completamenti |
In che modo gli utenti possono abilitare il rilevamento asincrono dello stato di avanzamento?
Gli utenti possono usare codice simile al codice seguente per abilitare questa funzionalità:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Disabilitare il monitoraggio del progresso asincrono
Quando il rilevamento dello stato asincrono è abilitato, il framework non verifica lo stato di avanzamento del checkpoint per ogni batch. Per risolvere questo problema, prima di disabilitare il rilevamento asincrono dello stato, elaborare almeno due micro-batch con le impostazioni seguenti:
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
Interrompere la query dopo che almeno due micro-batch hanno completato l'elaborazione. Ora puoi disabilitare in modo sicuro il tracciamento dell'avanzamento asincrono e riavviare l'interrogazione.
Se hai disabilitato il rilevamento dello stato asincrono senza completare questo passaggio, potresti riscontrare il seguente errore:
java.lang.IllegalStateException: batch x doesn't exist
Nei log del driver potrebbe essere visualizzato l'errore seguente:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Seguendo le istruzioni riportate in questa sezione per disabilitare il monitoraggio asincrono dei progressi, è possibile risolvere questi errori e riparare il carico di lavoro di streaming.
Limitazioni con il rilevamento asincrono dello stato di avanzamento
Questa funzionalità presenta le limitazioni seguenti:
- Il monitoraggio del progresso asincrono è supportato solo nelle pipeline senza stato quando si utilizza Kafka come sink.
- L'elaborazione end-to-end esattamente una volta non è garantita con il tracciamento del progresso asincrono perché gli intervalli di offset per batch possono essere modificati in caso di errore. Alcuni sink, ad esempio Kafka, non forniscono mai garanzie di tipo exactly-once.