Руководство по COPY INTO с помощью Spark SQL
Databricks рекомендует использовать команду COPY INTO для добавочной и массовой загрузки данных для источников данных, содержащих тысячи файлов. Databricks рекомендует использовать автозагрузчик для расширенных вариантов использования.
В этом руководстве вы используете команду COPY INTO
для загрузки данных из облачного хранилища объектов в table в рабочей области Azure Databricks.
Требования
- Подписка Azure, рабочая область Azure Databricks в этой подписке и кластер в этой рабочей области. Чтобы создать их, обратитесь к . Краткое руководство: Запуск задания Spark в рабочей области Azure Databricks с использованием портала Azure. Если следовать этому краткому руководству, вам не нужно следовать инструкциям в разделе Запуск задания SQL Spark.
- Универсальный кластер , в вашей рабочей области, работающий под управлением Databricks Runtime 11.3 LTS или более поздней версии. Сведения о создании кластера всех целей см. в справочнике по конфигурации вычислений.
- Знакомство с пользовательским интерфейсом рабочей области Azure Databricks. См. Навигация по рабочей области.
- Знакомство с записными книжками Databricks.
- Расположение, которое предназначено для записи данных; Демонстрация использует корневую директорию DBFS в качестве примера, но Databricks рекомендует использовать внешнее хранилище, настроенное с Unity Catalog.
Шаг 1. Настройка среды и создание генератора данных
В этом руководстве предполагается базовое знакомство с Azure Databricks и конфигурацией рабочей области по умолчанию. Если не удается запустить предоставленный код, обратитесь к администратору рабочей области, чтобы убедиться, что у вас есть доступ к вычислительным ресурсам и расположению, в которое можно записать данные.
Обратите внимание, что указанный код использует параметр source
для указания расположения, которое будет настроено в качестве источника данных COPY INTO
. Как записано, этот код указывает на расположение в корневом каталоге DBFS. Если у вас есть разрешение на запись в расположение внешнего хранилища объектов, замените часть исходной строки с указанием части dbfs:/
путем указания пути к вашему хранилищу объектов. Так как этот блок кода также выполняет рекурсивное удаление для reset этой демонстрации, убедитесь, что вы не указываете на рабочие данные и сохраняете вложенный каталог /user/{username}/copy-into-demo
, чтобы избежать перезаписи или удаления существующих данных.
Создайте новую записную книжку SQL и подключите ее к кластеру под управлением Databricks Runtime 11.3 LTS или более поздней версии.
Скопируйте и запустите следующий код, чтобы reset расположении хранилища и базе данных, используемой в этом руководстве:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Скопируйте и запустите следующий код, чтобы настроить некоторые tables и функции, которые будут использоваться для случайных generate данных:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
Шаг 2. Запись примеров данных в облачное хранилище
Запись в форматы данных, отличные от Delta Lake, редко используется в Azure Databricks. Приведенный здесь код записывается в JSON, имитируя внешнюю систему, которая может дампать результаты из другой системы в хранилище объектов.
Скопируйте и запустите следующий код для записи пакета необработанных данных JSON:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
Шаг 3. Загрузка идемпотентных данных JSON с помощью COPY INTO
Прежде чем использовать COPY INTO
, необходимо создать Delta Lake table в качестве целевой. В Databricks Runtime 11.3 LTS и более поздних версиях вам не нужно предоставлять ничего, кроме имени table в инструкции CREATE TABLE
. Для предыдущих версий Databricks Runtime необходимо предоставить schema при создании пустой table.
Скопируйте и запустите следующий код, чтобы создать целевую Delta table и загрузить данные из вашего источника.
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Так как это действие является идемпотентным, его можно запустить несколько раз, но данные будут загружаться только один раз.
Шаг 4: Просмотр содержимого table
Вы можете запустить простой SQL-запрос, чтобы вручную просмотреть содержимое этого table.
Скопируйте и выполните следующий код, чтобы просмотреть table:
-- Review updated table SELECT * FROM user_ping_target
Шаг 5. Загрузка дополнительных данных и результатов предварительной версии
Вы можете многократно повторно выполнять шаги 2-4, чтобы получить новые пакеты случайных необработанных данных JSON в вашем источнике, идемпотентно загрузить их в Delta Lake с помощью COPY INTO
и просмотреть результаты. Попробуйте выполнить эти шаги не по порядку или многократно, чтобы имитировать создание нескольких пакетов необработанных данных, выполняя COPY INTO
многократно без поступления новых данных having.
Шаг 6. Руководство по очистке
Когда вы закончите работу с этим руководством, вы можете очистить связанные ресурсы, если вы больше не хотите их сохранить.
Скопируйте и запустите следующий код, чтобы удалить базу данных, tablesи remove все данные:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Чтобы остановить вычислительный ресурс, перейдите на вкладку Кластеры и завершите кластер.
Дополнительные ресурсы
- Справочная статья COPY INTO