Partager via


Exécution de requête adaptative

L’exécution adaptative des requêtes correspond à la réoptimisation de la requête qui se produit pendant son exécution.

La raison d’être de la réoptimisation à l’exécution est qu’Azure Databricks dispose des dernières statistiques à la fin d’un échange de brassage et de diffusion (nommé étape de requête dans l’exécution adaptative des requêtes). Par conséquent, Azure Databricks peut opter pour une meilleure stratégie physique, choisir une taille et un nombre optimaux de partitions après brassage ou effectuer des optimisations qui exigeaient auparavant des indicateurs (par exemple la gestion des jointures d’asymétrie).

Cette solution peut se révéler très utile lorsque la collecte de statistiques n’est pas activée ou que les statistiques sont obsolètes. Elle est également pratique quand les statistiques dérivées statiquement sont inexactes, par exemple au milieu d’une requête compliquée ou après l’apparition d’une asymétrie des données.

Fonctionnalités

AQE est activé par défaut. Elle comporte quatre fonctionnalités majeures :

  • Modification dynamique de la jointure de fusion de tri en jointure hachée de diffusion
  • Fusion dynamique des partitions (combinaison de partitions de petite taille en partitions de dimensions raisonnables) après un échange de brassage. Les très petites tâches présentent un moins bon débit d’E/S et ont tendance à pâtir davantage de la surcharge de planification et de configuration des tâches. La combinaison de petites tâches permet d’économiser les ressources et d’améliorer le débit du cluster.
  • Gestion dynamique de l’asymétrie dans la jointure de fusion de tri et la jointure hachée de brassage en fractionnant (et en répliquant si nécessaire) les tâches asymétriques en tâches de taille à peu près égale.
  • Détection et propagation dynamique des relations vides.

Application

L’exécution adaptative des requêtes s’applique à toutes les requêtes qui présentent les caractéristiques suivantes :

  • Requêtes qui ne sont pas des requêtes de streaming
  • Requêtes qui contiennent au moins un échange (généralement lorsqu’il existe une jointure, une agrégation ou une fenêtre), une sous-requête ou les deux

Toutes les requêtes auxquelles s’applique l’exécution adaptative des requêtes ne sont pas nécessairement réoptimisées. La réoptimisation peut ou non être accompagnée d’un autre plan de requête que le plan compilé statiquement. Pour déterminer si le plan d’une requête a été modifié par l’exécution adaptative des requêtes, consultez la section suivante, Plans de requête.

Plans de requête

Cette section explique comment examiner les plans de requête de différentes façons.

Dans cette section :

Interface utilisateur Spark

Nœud AdaptiveSparkPlan

Les requêtes auxquelles s’applique l’exécution adaptative des requêtes contiennent un ou plusieurs nœuds AdaptiveSparkPlan, généralement en tant que nœud racine de chaque requête principale ou sous-requête. Avant ou pendant l’exécution de la requête, l’indicateur isFinalPlan du nœud AdaptiveSparkPlan correspondant apparaît comme false. Une fois l’exécution de la requête isFinalPlan terminée, il devient true..

Plan en évolution

Le diagramme du plan de requête évolue à mesure que l’exécution progresse pour refléter le plan le plus récent en cours d’exécution. Les nœuds qui ont déjà été exécutés (dans lesquels des métriques sont disponibles) ne changent pas, mais ceux qui ne le sont pas encore évoluent au fil du temps en raison des réoptimisations.

Voici un exemple de diagramme de plan de requête :

Query plan diagram

DataFrame.explain()

Nœud AdaptiveSparkPlan

Les requêtes auxquelles s’applique l’exécution adaptative des requêtes contiennent un ou plusieurs nœuds AdaptiveSparkPlan, généralement en tant que nœud racine de chaque requête principale ou sous-requête. Avant ou pendant l’exécution de la requête, l’indicateur isFinalPlan du nœud AdaptiveSparkPlan correspondant apparaît comme false. Une fois l’exécution de la requête isFinalPlan terminée, il devient true.

Plan actuel et initial

Sous chaque nœud AdaptiveSparkPlan se trouvent à la fois le plan initial (avant application des optimisations de l’exécution adaptative des requêtes) et le plan actuel ou final, selon que l’exécution est terminée ou non. Le plan actuel évolue à mesure que l’exécution progresse.

Statistiques d’exécution

Chaque étape de brassage et de diffusion contient des statistiques de données.

Avant ou pendant l’exécution de l’étape, les statistiques sont des estimations établies au moment de la compilation, et l’indicateur isRuntime est false (par exemple Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);).

Une fois l’exécution de l’étape terminée, les statistiques sont celles qui sont collectées à l’exécution, et l’indicateur isRuntime devient true (par exemple Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)).

Voici un exemple DataFrame.explain :

  • Avant l’exécution :

    Before execution

  • Pendant l’exécution :

    During execution

  • Après l’exécution :

    After execution

SQL EXPLAIN

Nœud AdaptiveSparkPlan

Les requêtes auxquelles s’applique l’exécution adaptative des requêtes contiennent un ou plusieurs nœuds AdaptiveSparkPlan, généralement en tant que nœud racine de chaque requête principale ou sous-requête.

Aucun plan actuel

Comme SQL EXPLAIN n’exécute pas la requête, le plan actuel est toujours le même que le plan initial et ne reflète pas ce qui serait en fin de compte exécuté par l’exécution adaptative des requêtes.

Voici un exemple SQL EXPLAIN :

SQL explain

Efficacité

Le plan de requête est modifié si une ou plusieurs optimisations de l’exécution adaptative des requêtes sont appliquées. L’effet de ces optimisations est démontré par la différence entre les plans actuel et final et le plan initial et les nœuds de plan spécifiques dans les plans actuel et final.

  • Modification dynamique de la jointure de fusion de tri en jointure hachée de diffusion : différents nœuds de jointure physique entre les plans actuel et final et le plan initial :

    Join strategy string

  • Fusion dynamique des partitions (nœud CustomShuffleReader avec propriété Coalesced) :

    Custom shuffle reader

    Custom shuffle reader string

  • Gestion dynamique de la jointure d’asymétrie (nœud SortMergeJoin avec champ isSkew défini sur la valeur true) :

    Skew join plan

    Skew join string

  • Détection et propagation dynamique des relations vides : une partie (ou la totalité) du plan est remplacée par le nœud LocalTableScan avec le champ relation vide.

    Local table scan

    Local table scan string

Configuration

Dans cette section :

Activation et désactivation de l’exécution adaptative des requêtes

Propriété
spark.databricks.optimizer.adaptive.enabled

Entrez : Boolean

Indique si l’exécution adaptative des requêtes est activée ou désactivée.

Valeur par défaut : true

Activer la lecture aléatoire optimisée automatiquement

Propriété
Spark.sql.shuffle.partitions

Entrez : Integer

Nombre par défaut de partitions à utiliser lors de la lecture aléatoire de données pour les jointures ou les agrégations. La définition de la valeur auto active la lecture aléatoire optimisée automatiquement, qui détermine automatiquement ce nombre en fonction du plan de requête et de la taille des données d’entrée de requête.

Remarque : Pour Structured Streaming, cette configuration ne peut pas être modifiée entre les redémarrages de requêtes à partir du même emplacement de point de contrôle.

Valeur par défaut : 200

Modification dynamique de la jointure de fusion de tri en jointure hachée de diffusion

Propriété
spark.databricks.adaptive.autoBroadcastJoinThreshold

Entrez : Byte String

Seuil pour déclencher le basculement vers la jonction de diffusion à l’exécution.

Valeur par défaut : 30MB

Fusion dynamique des partitions

Propriété
spark.sql.adaptive.coalescePartitions.enabled

Entrez : Boolean

Indique si la fusion de partitions est activée ou désactivée.

Valeur par défaut : true
spark.sql.adaptive.advisoryPartitionSizeInBytes

Entrez : Byte String

Taille cible après fusion. La taille des partitions fusionnées sera proche de cette taille cible, mais pas supérieure.

Valeur par défaut : 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize

Entrez : Byte String

Taille minimale des partitions après fusion. La taille des partitions fusionnées ne sera pas inférieure à cette taille.

Valeur par défaut : 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum

Entrez : Integer

Nombre minimal de partitions après fusion. Non recommandé, car ce paramètre remplace explicitement
spark.sql.adaptive.coalescePartitions.minPartitionSize.

Valeur par défaut : deux fois le nombre de cœurs de cluster

Gestion dynamique de la jointure d’asymétrie

Propriété
spark.sql.adaptive.skewJoin.enabled

Entrez : Boolean

Indique si la gestion des jointures d’asymétrie est activée ou désactivée.

Valeur par défaut : true
spark.sql.adaptive.skewJoin.skewedPartitionFactor

Entrez : Integer

Facteur qui, lorsqu’il est multiplié par la taille médiane de la partition, contribue à déterminer si une partition est asymétrique.

Valeur par défaut : 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

Entrez : Byte String

Seuil qui contribue à déterminer si une partition est asymétrique.

Valeur par défaut : 256MB

Une partition est considérée comme asymétrique lorsque (partition size > skewedPartitionFactor * median partition size) et (partition size > skewedPartitionThresholdInBytes) sont tous deux true.

Détection et propagation dynamique des relations vides

Propriété
spark.databricks.adaptive.emptyRelationPropagation.enabled

Entrez : Boolean

Indique si la propagation dynamique des relations vides est activée ou désactivée.

Valeur par défaut : true

Forum Aux Questions (FAQ)

Dans cette section :

Pourquoi l’exécution adaptative des requêtes n’a-t-elle pas diffusé une petite table de jointure ?

Si une relation n’est pas diffusée alors qu’elle a une taille inférieure à ce seuil, vérifiez les points suivants :

  • Vérifiez le type de jointure. La diffusion n’est pas prise en charge pour certains types de jointures, par exemple la relation gauche d’une requête LEFT OUTER JOIN.
  • Il est également possible que la relation contienne un grand nombre de partitions vides. Dans ce cas, la majorité des tâches peut se terminer rapidement avec la jointure de fusion de tri ou peut potentiellement être optimisée avec la gestion des jointures d’asymétrie. L’exécution adaptative des requêtes évite la modification de ces jointures de fusion de tri pour diffuser des jointures hachées si le pourcentage de partitions non vides est inférieur à spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Dois-je continuer à utiliser un indicateur de stratégie de jointure de diffusion avec l’exécution adaptative des requêtes activée ?

Oui. Une jointure de diffusion planifiée de manière statique est généralement plus performante qu’une jointure planifiée de manière dynamique par l’exécution adaptative des requêtes. En effet, celle-ci peut ne pas basculer vers la jonction de diffusion tant qu’elle n’a pas effectué de brassage pour les deux côtés de la jointure (la taille réelle des relations a à ce stade été récupérée). Un indicateur de diffusion peut donc rester un bon choix si vous connaissez bien votre requête. L’exécution adaptative des requêtes respecte les indicateurs de requête de la même façon que l’optimisation statique, mais peut toujours appliquer des optimisations dynamiques qui ne sont pas concernées par les indicateurs.

Quelle est la différence entre l’indicateur de jointure d’asymétrie et l’optimisation de jointure d’asymétrie de l’exécution adaptative des requêtes ? Que dois-je utiliser ?

Il est recommandé de s’appuyer sur la gestion des jointures d’asymétrie de l’exécution adaptative des requêtes plutôt que d’utiliser l’indicateur de jointure d’asymétrie. En effet, la jointure d’asymétrie de l’exécution adaptative est entièrement automatique et, en général, plus performante que son équivalent avec indicateur.

Pourquoi l’exécution adaptative des requêtes n’a-t-elle pas ajusté automatiquement mon ordre de jointure ?

La réorganisation dynamique des jointures ne fait pas partie de l'AQE.

Pourquoi l’exécution adaptative des requêtes n’a-t-elle pas détecté l’asymétrie des données ?

Deux conditions de taille doivent être satisfaites pour que l’exécution adaptative des requêtes détecte une partition comme étant asymétrique :

  • La taille de la partition est supérieure à spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (256 Mo par défaut).
  • La taille de la partition est supérieure à la taille médiane de toutes les partitions multiplié par le facteur de partition asymétrique spark.sql.adaptive.skewJoin.skewedPartitionFactor (5 par défaut).

En outre, la prise en charge de la gestion de l’asymétrie est limitée pour certains types de jointures. Par exemple, dans LEFT OUTER JOIN, seule la relation SKEW sur le côté gauche peut être optimisée.

Ancien

Si le terme « exécution adaptative » existe depuis la version 1.6 de Spark, le nouveau terme « exécution adaptative des requêtes » dans la version 3.0 est fondamentalement différent. Du point de vue des fonctionnalités, Spark 1.6 effectue uniquement la partie « fusion dynamique des partitions ». En termes d’architecture technique, la nouvelle exécution adaptative des requêtes est un framework de planification et de replanification dynamiques des requêtes basé sur les statistiques d’exécution. Il prend en charge diverses optimisations (par exemple celles que nous avons décrites dans cet article) et peut être étendu pour en offrir potentiellement d’autres encore.