Condividi tramite


Esecuzione adattiva di query

L'esecuzione di query adattive è la riottimizzazione delle query che si verifica durante l'esecuzione delle query.

La motivazione per la riottimizzazione del runtime è che Azure Databricks ha le statistiche più aggiornate accurate alla fine di uno scambio casuale e di trasmissione (detto fase di query in AQE). Di conseguenza, Azure Databricks può scegliere una strategia fisica migliore, scegliere una dimensione e un numero di partizioni post-shuffle ottimali oppure eseguire ottimizzazioni usate per richiedere suggerimenti, ad esempio la gestione dell'asimmetria dei join.

Ciò può essere molto utile quando la raccolta delle statistiche non è attivata o quando le statistiche non sono aggiornate. È utile anche in posizioni in cui le statistiche derivate in modo statico sono imprecise, ad esempio al centro di una query complessa o dopo l'occorrenza di asimmetrie dei dati.

Funzionalità

AQE è abilitato per impostazione predefinita. Ha 4 funzionalità principali:

  • Modifica dinamicamente l'ordinamento del merge join in broadcast hash join.
  • Unisce dinamicamente le partizioni (combinano partizioni di piccole dimensioni in partizioni ragionevolmente ridimensionate) dopo lo scambio casuale. Le attività molto piccole hanno una velocità effettiva di I/O peggiore e tendono a soffrire di più dalla pianificazione del sovraccarico e dall'overhead di configurazione delle attività. La combinazione di attività di piccole dimensioni consente di risparmiare risorse e migliorare la velocità effettiva del cluster.
  • Gestisce dinamicamente l'asimmetria nell'ordinamento del join di tipo merge e il join hash casuale suddividendo (e replicando, se necessario) attività asimmetrica in attività di dimensioni approssimative.
  • Rileva e propaga dinamicamente le relazioni vuote.

Applicazione

AQE si applica a tutte le query che sono:

  • Non in streaming
  • Contiene almeno uno scambio (in genere quando è presente un join, un'aggregazione o una finestra), una sottoquery o entrambe.

Non tutte le query applicate a AQE sono necessariamente ottimizzate di nuovo. La riottimizzazione potrebbe o non essere disponibile con un piano di query diverso da quello compilato in modo statico. Per determinare se il piano di una query è stato modificato da AQE, vedere la sezione Seguente, Piani di query.

Piani di query

In questa sezione viene illustrato come esaminare i piani di query in modi diversi.

Contenuto della sezione:

Interfaccia utente di Spark

Nodo AdaptiveSparkPlan

Le query applicate aQE contengono uno o più AdaptiveSparkPlan nodi, in genere come nodo radice di ogni query principale o sottoquery. Prima dell'esecuzione della query o quando è in esecuzione, il isFinalPlan flag del nodo corrispondente AdaptiveSparkPlan viene visualizzato come false; al termine dell'esecuzione della query, il isFinalPlan flag viene modificato in true.

Piano in evoluzione

Il diagramma del piano di query si evolve man mano che l'esecuzione avanza e riflette il piano più recente in esecuzione. I nodi che sono già stati eseguiti (in cui sono disponibili le metriche) non cambiano, ma quelli che non possono cambiare nel tempo in seguito alle riottimizzazione.

Di seguito è riportato un esempio di diagramma del piano di query:

Query plan diagram

DataFrame.explain()

Nodo AdaptiveSparkPlan

Le query applicate aQE contengono uno o più AdaptiveSparkPlan nodi, in genere come nodo radice di ogni query principale o sottoquery. Prima dell'esecuzione della query o quando è in esecuzione, il isFinalPlan flag del nodo corrispondente AdaptiveSparkPlan viene visualizzato come false; al termine dell'esecuzione della query, il isFinalPlan flag viene modificato in true.

Piano corrente e iniziale

In ogni AdaptiveSparkPlan nodo sarà presente sia il piano iniziale (il piano prima di applicare eventuali ottimizzazioni AQE) sia il piano corrente o finale, a seconda che l'esecuzione sia stata completata. Il piano corrente si evolverà man mano che l'esecuzione procede.

Statistiche di runtime

Ogni fase di riproduzione casuale e broadcast contiene statistiche dei dati.

Prima dell'esecuzione della fase o dell'esecuzione della fase, le statistiche sono stime in fase di compilazione e il flag isRuntime è false, ad esempio: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Al termine dell'esecuzione della fase, le statistiche vengono raccolte in fase di esecuzione e il flag isRuntime diventerà true, ad esempio: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Di seguito è riportato un DataFrame.explain esempio:

  • Prima dell'esecuzione

    Before execution

  • Durante l'esecuzione

    During execution

  • Dopo l'esecuzione

    After execution

SQL EXPLAIN

Nodo AdaptiveSparkPlan

Le query applicate aQE contengono uno o più nodi AdaptiveSparkPlan, in genere come nodo radice di ogni query principale o sottoquery.

Nessun piano corrente

Come SQL EXPLAIN non esegue la query, il piano corrente è sempre uguale al piano iniziale e non riflette ciò che alla fine verrebbe eseguito da AQE.

Di seguito è riportato un esempio di spiegazione DI SQL:

SQL explain

Efficacia

Il piano di query cambierà se una o più ottimizzazioni di AQE diventano effettive. L'effetto di queste ottimizzazioni AQE è dimostrato dalla differenza tra i piani correnti e finali e il piano iniziale e i nodi di piano specifici nei piani correnti e finali.

  • Modificare dinamicamente l'unione di ordinamento in broadcast hash join: nodi di join fisici diversi tra il piano corrente/finale e il piano iniziale

    Join strategy string

  • Unire dinamicamente le partizioni: nodo CustomShuffleReader con proprietà Coalesced

    Custom shuffle reader

    Custom shuffle reader string

  • Gestire dinamicamente l'asimmetria join: nodo SortMergeJoin con campo isSkew true.

    Skew join plan

    Skew join string

  • Rilevare e propagare dinamicamente le relazioni vuote: parte di (o intero) il piano viene sostituito dal nodo LocalTableScan con il campo relazione come vuoto.

    Local table scan

    Local table scan string

Configurazione

Contenuto della sezione:

Abilitare e disabilitare l'esecuzione di query adattive

Proprietà
spark.databricks.optimizer.adaptive.enabled

Tipo: Boolean

Se abilitare o disabilitare l'esecuzione di query adattive.

Valore predefinito: true

Abilitare la sequenza casuale ottimizzata automaticamente

Proprietà
spark.sql.shuffle.partitions

Tipo: Integer

Numero predefinito di partizioni da usare per la riproduzione casuale dei dati per join o aggregazioni. L'impostazione del valore auto abilita lo shuffle ottimizzato automaticamente, che determina automaticamente questo numero in base al piano di query e alle dimensioni dei dati di input della query.

Nota: per Structured Streaming, questa configurazione non può essere modificata tra i riavvii della query dalla stessa posizione del checkpoint.

Valore predefinito: 200

Modificare dinamicamente l'unione di ordinamento in broadcast hash join

Proprietà
spark.databricks.adaptive.autoBroadcastJoinThreshold

Tipo: Byte String

Soglia per attivare il passaggio al join di trasmissione in fase di esecuzione.

Valore predefinito: 30MB

Unire dinamicamente le partizioni

Proprietà
spark.sql.adaptive.coalescePartitions.enabled

Tipo: Boolean

Se abilitare o disabilitare la unione delle partizioni.

Valore predefinito: true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Tipo: Byte String

Dimensioni della destinazione dopo l'unione. Le dimensioni delle partizioni coalesce saranno vicine, ma non superiori a quelle di questa destinazione.

Valore predefinito: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Tipo: Byte String

Dimensioni minime delle partizioni dopo l'unione. Le dimensioni delle partizioni coalesce non saranno inferiori a quelle di questa dimensione.

Valore predefinito: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Tipo: Integer

Numero minimo di partizioni dopo l'unione. Non consigliato, perché l'impostazione esegue l'override esplicito
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valore predefinito: 2x no. di core del cluster

Gestire dinamicamente il join di asimmetria

Proprietà
spark.sql.adaptive.skewJoin.enabled

Tipo: Boolean

Se abilitare o disabilitare la gestione dell'asimmetria dei join.

Valore predefinito: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Tipo: Integer

Fattore che, se moltiplicato per le dimensioni della partizione mediano, contribuisce a determinare se una partizione è asimmetrica.

Valore predefinito: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Tipo: Byte String

Soglia che contribuisce a determinare se una partizione è asimmetrica.

Valore predefinito: 256MB

Una partizione viene considerata asimmetrica quando e (partition size > skewedPartitionFactor * median partition size) (partition size > skewedPartitionThresholdInBytes) sono true.

Rilevare e propagare dinamicamente le relazioni vuote

Proprietà
spark.databricks.adaptive.emptyRelationPropagation.enabled

Tipo: Boolean

Se abilitare o disabilitare la propagazione dinamica delle relazioni vuote.

Valore predefinito: true

Domande frequenti

Contenuto della sezione:

Perché AQE non ha trasmesso una piccola tabella join?

Se la dimensione della relazione prevista per la trasmissione scende al di sotto di questa soglia, ma non è ancora trasmessa:

  • Controllare il tipo di join. La trasmissione non è supportata per determinati tipi di join, ad esempio la relazione sinistra di un LEFT OUTER JOIN oggetto non può essere trasmessa.
  • Può anche essere che la relazione contiene molte partizioni vuote, nel qual caso la maggior parte delle attività può terminare rapidamente con il merge join di ordinamento o può essere potenzialmente ottimizzata con la gestione del join asimmetria. AQE evita di modificare tali join di ordinamento per trasmettere join hash se la percentuale di partizioni non vuote è inferiore a spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

È comunque consigliabile usare un hint per la strategia di join broadcast con AQE abilitato?

Sì. Un join di trasmissione pianificato in modo statico è in genere più efficiente di quello pianificato dinamicamente da AQE, perché AQE potrebbe non passare al join broadcast fino a quando non si esegue shuffle per entrambi i lati del join (entro il quale vengono ottenute le dimensioni effettive delle relazioni). Pertanto, l'uso di un hint di trasmissione può comunque essere una buona scelta se si conosce bene la query. AQE rispetterà gli hint di query allo stesso modo dell'ottimizzazione statica, ma può comunque applicare ottimizzazioni dinamiche non interessate dagli hint.

Qual è la differenza tra l'hint di join asimmetria e l'ottimizzazione dell'asimmetria dei join AQE? Quale è più indicato usare?

È consigliabile basarsi sulla gestione dell'asimmetria del join AQE anziché usare l'hint di join asimmetria, perché il join asimmetria di AQE è completamente automatico e in generale offre prestazioni migliori rispetto alla controparte hint.

Perché AQE non ha modificato automaticamente l'ordinamento dei join?

Il riordinamento del join dinamico non fa parte di AQE.

Perché AQE non ha rilevato un'asimmetria dei dati?

Esistono due condizioni di dimensione che devono essere soddisfatte affinché AQE rilevi una partizione come partizione asimmetrica:

  • Le dimensioni della spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes partizione sono maggiori di (impostazione predefinita 256 MB)
  • Le dimensioni della partizione sono maggiori delle dimensioni mediano di tutte le partizioni volte il fattore spark.sql.adaptive.skewJoin.skewedPartitionFactor di partizione asimmetrico (impostazione predefinita 5)

Inoltre, il supporto per la gestione delle asimmetrie è limitato per determinati tipi di join, ad esempio in LEFT OUTER JOIN, è possibile ottimizzare solo l'asimmetria sul lato sinistro.

Legacy

Il termine "Esecuzione adattiva" esiste da Spark 1.6, ma il nuovo AQE in Spark 3.0 è fondamentalmente diverso. In termini di funzionalità, Spark 1.6 esegue solo la parte "unione dinamica delle partizioni". In termini di architettura tecnica, il nuovo AQE è un framework di pianificazione dinamica e riorganizzazione delle query in base alle statistiche di runtime, che supporta un'ampia gamma di ottimizzazioni, ad esempio quelle descritte in questo articolo e può essere esteso per consentire un maggior numero di ottimizzazioni potenziali.