Considerazioni sulla produzione per Structured Streaming
Questo articolo contiene raccomandazioni per la pianificazione di carichi di lavoro Structured Streaming usando processi in Azure Databricks.
Databricks consiglia di eseguire sempre le operazioni seguenti:
-
Remove codice non necessario dai notebook che potrebbero restituire risultati, ad esempio
display
ecount
. - Non eseguire carichi di lavoro Structured Streaming usando il calcolo multiuso. Pianificare sempre i flussi come processi usando il calcolo processi.
- Pianificare i processi usando la modalità
Continuous
. - Non abilitare la scalabilità automatica per il calcolo per i processi Structured Streaming.
Alcuni carichi di lavoro traggono vantaggio da quanto segue:
- Configurare l'archivio di stato RocksDB su Azure Databricks
- Checkpoint dello stato asincrono per le query con stato
- Che cos’è il rilevamento asincrono dello stato di avanzamento?
Azure Databricks ha introdotto Delta Live Tables per ridurre le complessità della gestione dell'infrastruttura di produzione per i carichi di lavoro Streaming Strutturato. Databricks consiglia di usare Tables Delta Live per le nuove pipeline di Structured Streaming. Vedi Che cos'è Delta Live Tables?.
Nota
La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare delta live Tables con scalabilità automatica avanzata per i carichi di lavoro di streaming. Visualizza Optimize l'utilizzo del cluster delle pipeline Delta Live Tables con scalabilità automatica avanzata.
Progettare carichi di lavoro di streaming per aspettarsi un errore
Databricks consiglia di configurare sempre i processi di streaming per il riavvio automatico in caso di errore. Alcune funzionalità, tra cui schema evoluzione, presuppongono che i carichi di lavoro Structured Streaming siano configurati per riprovare automaticamente. Vedere Configurare processi Structured Streaming per riavviare le query di streaming in caso di errore.
Alcune operazioni, ad esempio foreachBatch
forniscono garanzie di tipo almeno una volta anziché esattamente una volta. Per queste operazioni, è necessario fare in modo che la pipeline di elaborazione sia idempotente. Vedere Usare foreachBatch per scrivere sink di dati arbitrari.
Nota
Quando una query viene riavviata, viene elaborato il micro-batch pianificato durante l'esecuzione precedente. Se il processo non è riuscito a causa di un errore di memoria insufficiente o se è stato annullato manualmente un processo a causa di un micro batch sovradimensionato, potrebbe essere necessario aumentare le prestazioni del calcolo per elaborare correttamente il micro batch.
Se si modificano le configurazioni tra le esecuzioni, queste configurazioni si applicano al primo nuovo batch pianificato. Vedere Ripristinare dopo le modifiche in una query Structured Streaming.
Quando viene eseguito un nuovo tentativo di processo?
È possibile pianificare più attività come parte di un processo di Azure Databricks. Quando si configura un processo usando il trigger continuo, non è possibile set dipendenze tra le attività.
È possibile scegliere di pianificare più flussi in un singolo processo usando uno degli approcci seguenti:
- Attività multiple: definire un processo con attività multiple che eseguono carichi di lavoro di streaming usando il trigger continuo.
- Query multiple: definire query di streaming multiple nel codice sorgente per una singola attività.
È anche possibile combinare queste strategie. Il seguente table confronta questi approcci.
Attività multiple | Query multiple | |
---|---|---|
Come viene condiviso il calcolo? | Databricks consiglia di distribuire processi di calcolo con dimensioni appropriate per ogni attività di streaming. Facoltativamente, è possibile condividere il calcolo tra le attività. | Tutte le query condividono lo stesso calcolo. È possibile assegnare facoltativamente query ai pool di utilità di pianificazione. |
Come vengono gestiti i tentativi? | Tutte le attività devono avere esito negativo prima del tentativo del processo. | L'attività ritenta se una query non riesce. |
Configurare processi Structured Streaming per riavviare le query di streaming in caso di errore
Databricks consiglia di configurare tutti i carichi di lavoro di streaming usando il trigger continuo. Vedere Eseguire processi in modo continuo.
Il trigger continuo fornisce il comportamento seguente per impostazione predefinita:
- Impedisce più esecuzioni simultanee del processo.
- Avvia una nuova esecuzione quando un'esecuzione precedente ha esito negativo.
- Usa il backoff esponenziale per i tentativi.
Databricks consiglia di usare sempre il calcolo processi anziché il calcolo multiuso per la pianificazione dei flussi di lavoro. In caso di errore del processo e nuovi tentativi, le nuove risorse di calcolo vengono distribuite.
Nota
Non è necessario usare streamingQuery.awaitTermination()
o spark.streams.awaitAnyTermination()
. I processi impediscono automaticamente il completamento di un'esecuzione quando una query di streaming è attiva.
Usare pool di utilità di pianificazione per query di streaming multiple
È possibile configurare i pool di pianificazione per assegnare capacità di calcolo alle query durante l'esecuzione di query di streaming multiple dallo stesso codice sorgente.
Per impostazione predefinita, tutte le query avviate in un notebook vengono eseguite nello stesso pool di pianificazione equo. I processi Apache Spark generati da trigger da tutte le query di streaming in un notebook vengono eseguiti uno dopo l'altro nell'ordine FIFO (First In, First Out). Ciò può causare ritardi non necessari nelle query, perché non condividono in modo efficiente le risorse del cluster.
I pool di utilità di pianificazione consentono di dichiarare le query Structured Streaming che condividono le risorse di calcolo.
Nell'esempio seguente viene assegnato query1
a un pool dedicato, mentre query2
e query3
condividono un pool di utilità di pianificazione.
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
Nota
La configurazione della proprietà locale deve trovarsi nella stessa cella del notebook where in cui si avvia la query di streaming.
Per altri dettagli, vedere la documentazione di Apache Fair Scheduler.