Поделиться через


Get начал использовать COPY INTO для загрузки данных

Команда SQL COPY INTO позволяет загружать данные из расположения файла в Delta table. Это идемпотентная операция с возможностью повторных попыток — файлы в исходном расположении, которые уже были загружены, пропускаются.

COPY INTO предлагает следующие возможности:

  • Легко настраиваемые фильтры файлов или каталогов из облачного хранилища, включая S3, ADLS 2-го поколения, ABFS, GCS и Unity Catalogvolumes.
  • Поддержка нескольких форматов исходных файлов: CSV, JSON, XML, Avro, ORC, Parquet, text и двоичных файлов
  • По умолчанию обработка файлов в точности один раз (идемпотентная)
  • Целевой tableschema вывод, сопоставление, слияние и эволюция

Примечание.

Для обеспечения более масштабируемого и надежного процесса приема файлов, Databricks рекомендует пользователям SQL использовать потоковую tables. См. загрузка данных с использованием потоковой передачи tables в Databricks SQL.

Предупреждение

COPY INTO учитывает параметр рабочей области для векторов удаления. Если этот параметр включен, векторы удаления также включены на целевом table, когда COPY INTO выполняется в SQL-хранилище или на вычислительных ресурсах, работающих под управлением Databricks Runtime 14.0 или более поздней версии. После включения векторы удаления блокируют запросы к table в Databricks Runtime 11.3 LTS и ниже. См. векторы удаления и векторы автоматического включения удаления.

Требования

Администратор учетной записи должен выполнить действия, описанные в разделе "Настройка доступа к данным для приема данных", чтобы настроить доступ к данным в облачном хранилище объектов, прежде чем пользователи смогут загружать данные с помощью COPY INTO.

Пример. Загрузка данных в table без схемы Delta Lake

Примечание.

Эта функция доступна в Databricks Runtime 11.3 LTS и выше.

Вы можете создать пустую метку Delta tables, чтобы schema позже выводился во время команды COPY INTO, присвоив mergeSchema значение true в COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

Указанная выше инструкция SQL является идемпотентной и может быть запланирована для выполнения, чтобы гарантировать единоразовый прием данных в Delta table.

Примечание.

Пустая дельта table непригодна для использования за пределами COPY INTO. INSERT INTO и MERGE INTO не поддерживаются для записи данных в Delta без схемы tables. После вставки данных с помощью COPY INTOв table, table становится доступным для запросов.

Дополнительные сведения см. в статье Создание целевого tables для COPY INTO.

Пример: Setschema и загрузка данных в Delta Lake table

В следующем примере показано, как создать Delta table, а затем использовать команду SQL COPY INTO для загрузки образцов данных из наборов данных Databricks в table. Примеры кода на языках Python, R, Scala и SQL можно запустить из записной книжки, подключенной к кластеру Azure Databricks. Вы также можете выполнить код SQL из запроса, связанного с хранилищем SQL, в Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

Чтобы очистить, выполните следующий код, который удаляет table:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

Очистка файлов метаданных

Вы можете запустить VACUUM для очистки файлов метаданных, созданных COPY INTO в Databricks Runtime 15.2 и выше.

Справочные материалы

  • Databricks Runtime 7.x и выше: COPY INTO

Дополнительные ресурсы