Remplacer sélectivement les données avec Delta Lake
Azure Databricks tire parti de la fonctionnalité Delta Lake pour prendre en charge deux options distinctes pour les remplacements sélectifs :
- L’option
replaceWhere
remplace atomiquement tous les enregistrements qui correspondent à un prédicat donné. - Vous pouvez remplacer des répertoires de données en fonction de la façon dont les tables sont partitionnées à l’aide de remplacements de partition dynamiques.
Pour la plupart des opérations, Databricks recommande d’utiliser replaceWhere
pour spécifier les données à remplacer.
Important
Si les données ont été accidentellement remplacées, vous pouvez utiliser la restauration pour annuler la modification.
Remplacement sélectif arbitraire avec replaceWhere
Vous pouvez remplacer de manière sélective uniquement les données qui correspondent à une expression arbitraire.
Remarque
SQL requiert Databricks Runtime 12.2 LTS ou version ultérieure.
La commande suivante remplace atomiquement les événements en janvier dans la table cible, qui est partitionnée par start_date
, avec les données dans replace_data
:
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
Cet exemple de code écrit les données dans replace_data
, valide que toutes les lignes correspondent au prédicat et effectue un remplacement atomique au moyen de la sémantique overwrite
. Si des valeurs de l’opération se trouvent hors de la contrainte, cette opération échoue par défaut avec une erreur.
Vous pouvez modifier ce comportement en valeurs overwrite
dans la plage de prédicats et les enregistrements insert
qui se trouvent hors de la plage spécifiée. Pour ce faire, désactivez la vérification des contraintes en définissant la valeur spark.databricks.delta.replaceWhere.constraintCheck.enabled
sur false à l’aide de l’un des paramètres suivants :
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
Comportement hérité
Avec le comportement par défaut hérité, replaceWhere
remplaçait les données correspondant à un prédicat sur les colonnes de partition uniquement. Avec ce modèle hérité, la commande suivante remplace atomiquement le mois de janvier dans la table cible, partitionnée par date
, par les données dans df
:
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
Si vous souhaitez revenir à l’ancien comportement, vous pouvez désactiver l’indicateur spark.databricks.delta.replaceWhere.dataColumns.enabled
:
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
Remplacements de partition dynamique
Important
Cette fonctionnalité est disponible en préversion publique.
Databricks Runtime 11.3 LTS et versions ultérieures prend en charge le mode de remplacement de partition dynamique pour les tables partitionnées. Pour les tables avec plusieurs partitions, Databricks Runtime 11.3 LTS et versions antérieures prend uniquement en charge les remplacements de partition dynamiques si toutes les colonnes de partition sont du même type de données.
En mode de remplacement de partition dynamique, les opérations remplacent toutes les données existantes dans chaque partition logique pour laquelle l’écriture valide de nouvelles données. Toutes les partitions logiques existantes pour lesquelles l’écriture ne contient pas de données restent inchangées. Ce mode s’applique uniquement lorsque les données sont écrites en mode de remplacement : soit INSERT OVERWRITE
en SQL, ou une écriture DataFrame avec df.write.mode("overwrite")
.
Configurez le mode de remplacement de partition dynamique en définissant la configuration de session Spark spark.sql.sources.partitionOverwriteMode
sur dynamic
. Vous pouvez également l’activer en définissant l’option DataFrameWriter
partitionOverwriteMode
sur dynamic
. Si elle est présente, l’option spécifique à la requête remplace le mode défini dans la configuration de session. La valeur par défaut de partitionOverwriteMode
est de static
.
Important
Vérifiez que les données écrites avec une partition dynamique remplacent uniquement les partitions attendues. Une seule ligne dans la partition incorrecte peut entraîner une remplacement involontaire d’une partition entière.
L’exemple suivant illustre l’utilisation de remplacements de partition dynamique :
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
Notes
- La remplacement dynamique de la partition est en conflit avec l’option
replaceWhere
pour les tables partitionnées.- Si le remplacement de partition dynamique est activé dans la configuration de session Spark et
replaceWhere
est fourni en tant qu’optionDataFrameWriter
, Delta Lake remplace les données en fonction de l’expressionreplaceWhere
(les options spécifiques aux requêtes remplacent les configurations de session). - Vous recevez une erreur si les options
DataFrameWriter
disposent à la fois d’un remplacement de partition dynamique etreplaceWhere
activé.
- Si le remplacement de partition dynamique est activé dans la configuration de session Spark et
- Vous ne pouvez pas spécifier
overwriteSchema
commetrue
lors de l’utilisation du remplacement de partition dynamique.