Prise en main de COPY INTO pour charger des données.
La commande SQL COPY INTO
vous permet de charger des données d’un emplacement de fichier vers une table Delta. Il s’agit d’une opération répétable et idempotente : les fichiers de l’emplacement source qui ont déjà été chargés sont ignorés.
COPY INTO
offre les capacités suivantes :
- Filtres de fichiers ou de répertoires facilement configurables à partir du stockage en ligne, notamment les volumes S3, ADLS Gen2, ABFS, GCS et Unity Catalog.
- Prise en charge de plusieurs formats de fichiers sources : CSV, JSON, XML, Avro, ORC, Parquet, texte et fichiers binaires
- Traitement de fichiers une seule fois (idempotent) par défaut
- Inférence, mappage, fusion et évolution de schéma de table cible
Remarque
Pour bénéficier d’une expérience d’ingestion de fichiers plus évolutive et plus robuste, Databricks recommande aux utilisateurs SQL de tirer parti des tables de diffusion en continu. Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL.
Avertissement
COPY INTO
respecte le paramètre d’espace de travail pour les vecteurs de suppression. Si cette option est activée, les vecteurs de suppression sont activés sur la table cible lorsque COPY INTO
s’exécute sur un entrepôt SQL ou que du calcul s’exécute sur Databricks Runtime 14.0 ou une version ultérieure. Une fois activés, les vecteurs de suppression bloquent les requêtes sur une table dans Databricks Runtime 11.3 LTS et les versions inférieures. Voir Quels sont les vecteurs de suppression ? et Activer automatiquement les vecteurs de suppression.
Spécifications
Un administrateur de compte doit suivre les étapes décrites dans Configurer l’accès aux données pour l’ingestion pour configurer l’accès aux données dans le stockage d’objets cloud avant que les utilisateurs puissent charger des données à l’aide de COPY INTO
.
Exemple : charger des données dans une table Delta Lake sans schéma
Remarque
Cette fonctionnalité est disponible sur Databricks Runtime 11.3 LTS et versions ultérieures.
Vous pouvez créer des tables Delta d’espace réservé vides afin que le schéma soit déduit ultérieurement lors d’une commande COPY INTO
en configurant mergeSchema
vers true
dans COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
L’instruction SQL ci-dessus est idempotente et peut être planifiée pour s’exécuter de manière à ingérer des données exactement une fois dans une table Delta.
Notes
La table Delta vide n’est pas utilisable en dehors de COPY INTO
. INSERT INTO
et MERGE INTO
ne sont pas pris en charge pour écrire des données dans des tables Delta sans schéma. Une fois les données insérées dans la table avec COPY INTO
, la table devient interrogeable.
Voir la section Créer des tables cibles pour COPY INTO.
Exemple : définir le schéma et charger des données dans une table Delta Lake
L’exemple suivant montre comment créer une table Delta, puis utiliser la commande SQL COPY INTO
pour charger dans la table des exemples de données provenant de jeux de données Databricks. Vous pouvez exécuter l’exemple de code Python, R, Scala ou SQL à partir d’un notebook attaché à un cluster Azure Databricks. Vous pouvez aussi exécuter le code SQL à partir d’une requête associée à un entrepôt SQL dans Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Pour nettoyer, exécutez le code suivant, qui supprime la table :
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Nettoyer les fichiers de métadonnées
Vous pouvez exécuter VACUUM pour nettoyer les fichiers de métadonnées non référencés créés par COPY INTO
dans Databricks Runtime 15.2 et ultérieur.
Informations de référence
- Databricks Runtime 7.x et versions ultérieures : COPY INTO
Ressources supplémentaires
Charger des données à l’aide de COPY INTO avec des volumes ou emplacements externes Unity Catalog
Charger des données à l’aide de COPY INTO avec un principal de service
Pour obtenir des modèles d’utilisation courante, y compris des exemples d’opérations multiples
COPY INTO
sur la même table Delta, consultez la section Modèles de chargement de données courants à l’aide de COPY INTO.