Get ha iniziato a usare COPY INTO per caricare i dati
Il comando COPY INTO
SQL consente di caricare dati da un percorso di file in un tableDelta. Si tratta di un'operazione riabilitabile e idempotente; i file nel percorso di origine che sono già stati caricati vengono ignorati.
COPY INTO
offre le funzionalità seguenti:
- Filtri di file o directory facilmente configurabili dall'archiviazione cloud, tra cui S3, ADLS Gen2, ABFS, GCS e Unity Catalogvolumes.
- Supporto per più formati di file di origine: CSV, JSON, XML, Avro, ORC, Parquet, text e file binari
- Elaborazione di file di tipo exactly-once (idempotente) per impostazione predefinita
- Inferenza di destinazione tableschema, mappatura, unione ed evoluzione
Nota
Per un'esperienza di inserimento di file più scalabile e affidabile, Databricks consiglia agli utenti SQL di sfruttare lo streaming tables. Consulta Caricamento dei dati con streaming tables in Databricks SQL.
Avviso
COPY INTO
rispetta l'impostazione dell'area di lavoro per i vettori di eliminazione. Se attivata, i vettori di eliminazione vengono abilitati sul table di destinazione quando COPY INTO
viene eseguito su un'istanza di SQL Warehouse o su una risorsa di calcolo che utilizza Databricks Runtime 14.0 o versioni successive. Dopo l'abilitazione, i vettori di eliminazione bloccano le query su un table in Databricks Runtime 11.3 LTS e versioni precedenti. Vedere Che cosa sono i vettori di eliminazione? e Abilitare automaticamente i vettori di eliminazione.
Requisiti
Un amministratore dell'account deve seguire la procedura descritta in Configurare l'accesso ai dati per l'inserimento per configurare l'accesso ai dati nell'archiviazione di oggetti cloud prima che gli utenti possano caricare i dati usando COPY INTO
.
Esempio: Carica dati in un Delta Lake senza schema table
Nota
Questa funzionalità è disponibile in Databricks Runtime 11.3 LTS e versioni successive.
È possibile creare un segnaposto Delta vuoto tables in modo che il schema sia in seguito dedotto durante un comando di COPY INTO
impostando mergeSchema
a true
in COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
L'istruzione SQL precedente è idempotente e può essere schedulata per eseguire l'inserimento dei dati esattamente una volta in un tableDelta.
Nota
Il Delta table vuoto non è usabile all'esterno di COPY INTO
.
INSERT INTO
e MERGE INTO
non sono supportati per scrivere dati in delta senza schema tables. Dopo l'inserimento dei dati nella table con COPY INTO
, il table diventa interrogabile.
Vedi Crea il target tables per COPY INTO.
Esempio: Setschema e caricare i dati in un Delta Lake table
Nell'esempio seguente viene illustrato come creare un table Delta e quindi usare il comando COPY INTO
SQL per caricare dati di esempio da set di dati Databricks nel table. È possibile eseguire il codice Python, R, Scala o SQL di esempio da un notebook collegato a un cluster Azure Databricks. È anche possibile eseguire il codice SQL da una query associata a un'istanza di SQL Warehouse in Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Per eseguire la pulizia, eseguire il codice seguente, che elimina il table:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Pulire i file di metadati
È possibile eseguire VACUUM per pulire i file di metadati non referenziati creati da COPY INTO
in Databricks Runtime 15.2 e versioni successive.
Riferimento
- Databricks Runtime 7.x e versioni successive: COPY INTO
Risorse aggiuntive
Caricare dati usando COPY INTO con Unity Catalogvolumes o posizioni esterne
Per i comuni modelli di utilizzo, inclusi esempi di più operazioni di
COPY INTO
sullo stesso Delta table, vedere Modelli comuni di caricamento dei dati utilizzando COPY INTO.