Справочник по языку SQL для разностных динамических таблиц
В этой статье содержатся сведения о интерфейсе программирования SQL Delta Live Tables.
- Дополнительные сведения об API Python см. в Справочнике по языку Python для разностных динамических таблиц.
- Дополнительные сведения о командах SQL см . в справочнике по языку SQL.
В запросах SQL можно использовать определяемые пользователем функции Python, но перед их вызовом в исходных файлах SQL необходимо определить эти определяемые пользователем функции Python. См . раздел "Определяемые пользователем скалярные функции " Python".
Ограничения
Предложение PIVOT
не поддерживается. Для pivot
выполнения операции в Spark требуется загрузка входных данных для вычисления выходной схемы. Эта возможность не поддерживается в разностных динамических таблицах.
Создание материализованного представления или потоковой таблицы Разностных динамических таблиц
Примечание.
- Синтаксис
CREATE OR REFRESH LIVE TABLE
для создания материализованного представления не рекомендуется. ИспользуйтеCREATE OR REFRESH MATERIALIZED VIEW
. - Чтобы использовать предложение для включения кластеризации жидкости, конвейер должен быть настроен для использования
CLUSTER BY
канала предварительной версии.
При объявлении таблицы потоковой передачи или материализованного представления используется тот же базовый синтаксис SQL.
Объявление материализованного представления разностных динамических таблиц с помощью SQL
Ниже описан синтаксис объявления материализованного представления в разностных динамических таблицах с помощью SQL:
CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Объявление таблицы потоковой передачи разностных динамических таблиц с помощью SQL
Таблицы потоковой передачи можно объявлять только с помощью запросов, которые считываются в источнике потоковой передачи. Databricks рекомендует использовать автозагрузчик для приема файлов из облачного хранилища объектов. См . синтаксис SQL автозагрузчика.
При указании других таблиц или представлений в конвейере в качестве источников потоковой передачи необходимо включить STREAM()
функцию вокруг имени набора данных.
Ниже описан синтаксис объявления таблицы потоковой передачи в разностных динамических таблицах с sql:
CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
[(
[
col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
[ table_constraint ] [, ...]
)]
[USING DELTA]
[PARTITIONED BY (col_name1, col_name2, ... )]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
[ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
AS select_statement
Создание представления разностных динамических таблиц
Ниже описан синтаксис для объявления представлений с помощью SQL:
CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
[(
[
col_name1 [ COMMENT col_comment1 ],
col_name2 [ COMMENT col_comment2 ],
...
]
[
CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
...
]
)]
[COMMENT view_comment]
AS select_statement
Синтаксис автоматического загрузчика SQL
Ниже описан синтаксис для работы с Автозагрузчиком в SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM read_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
С Автозагрузчиком можно использовать поддерживаемые параметры формата. map()
С помощью функции можно передать параметры методуread_files()
. Параметры — это пары "ключ-значение", где ключи и значения являются строками. Дополнительные сведения о форматах и параметрах поддержки см. в параметрах формата файлов.
Пример. Определение таблиц
Набор данных можно создать путем считывания из внешнего источника данных или наборов данных, определенных в конвейере. Для чтения из внутреннего набора данных, добавьте ключевое слово LIVE
в начало имени набора данных: В следующем примере определяется два различных набора данных: таблица с именем taxi_raw
, которая принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data
, принимающая таблицу taxi_raw
в качестве входных данных:
CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Пример. Чтение из источника потоковой передачи
Чтобы считывать данные из источника потоковой передачи, например автозагрузчик или внутренний набор данных, определите таблицу STREAMING
:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)
Дополнительные сведения о потоковой передаче данных см. в разделе "Преобразование данных с помощью разностных динамических таблиц".
Управление материализацией таблиц
Таблицы также обеспечивают дополнительный контроль над их материализацией:
- Укажите, как секционируются таблицы с помощью
PARTITIONED BY
. Секционирование можно использовать для ускорения запросов. - Свойства таблицы можно задать с помощью
TBLPROPERTIES
. См . свойства таблицы Delta Live Table. - Задайте место хранения с помощью параметра
LOCATION
. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, еслиLOCATION
не задано. - Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.
Примечание.
Для таблиц меньше 1 ТБ в размере Databricks рекомендует разрешить delta Live Tables управлять данными организации данных. Если вы не ожидаете, что таблица будет расти за пределами терабайта, Databricks рекомендует не указывать столбцы секций.
Пример. Указание столбцов схемы и секционирования
При выборе таблицы можно указать схему. В следующем примере указывается схема целевой таблицы, включая использование созданных столбцов Delta Lake и определение столбцов секционирования для таблицы:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
По умолчанию разностные динамические таблицы выводят схему из определения table
, если схема не указана.
Пример. Определение ограничений таблицы
Примечание.
Поддержка разностных динамических таблиц для ограничений таблиц доступна в общедоступной предварительной версии. Чтобы определить ограничения таблицы, конвейер должен быть конвейером с поддержкой каталога Unity и настроен для использования preview
канала.
При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. предложение CONSTRAINT в справочнике по языку SQL.
В следующем примере определяется таблица с ограничением первичного и внешнего ключа:
CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...
Параметризация значений, используемых при объявлении таблиц или представлений с помощью SQL
Используется SET
для указания значения конфигурации в запросе, объявляющего таблицу или представление, включая конфигурации Spark. Любая таблица или представление, определенные в записной книжке после того, как инструкция SET
получила доступ к определенному значению. Любые конфигурации Spark, заданные с помощью инструкции SET
, используются при выполнении запроса Spark для любой таблицы или представления после инструкции SET. Чтобы считать значение конфигурации в запросе, используйте синтаксис интерполяции строк ${}
. В следующем примере задается значение конфигурации Spark с именем startDate
, которое используется в запросе:
SET startDate='2020-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Чтобы указать несколько значений конфигурации, используйте отдельную инструкцию SET
для каждого значения.
Пример. Определение фильтра строк и маски столбцов
Внимание
Фильтры строк и маски столбцов находятся в общедоступной предварительной версии.
Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предложение ROW FILTER и предложение MASK. В следующем примере показано, как определить материализованное представление и таблицу Потоковой передачи с фильтром строк и маской столбца:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)
CREATE OR REFRESH MATERIALIZED VIEW sales (
customer_id STRING MASK catalog.schema.customer_id_mask_fn,
customer_name STRING,
number_of_line_items STRING COMMENT 'Number of items in the order',
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze
Дополнительные сведения о фильтрах строк и масках столбцов см. в статье "Публикация таблиц" с фильтрами строк и масками столбцов.
Свойства SQL
Примечание.
Чтобы использовать предложение для включения кластеризации жидкости, конвейер должен быть настроен для использования CLUSTER BY
канала предварительной версии.
CREATE TABLE или VIEW |
---|
TEMPORARY Создайте таблицу, но не публикуйте метаданные для таблицы. Предложение TEMPORARY указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления. |
STREAMING Создание таблицы, считывающей входной набор данных в виде потока. Входной набор данных должен быть источником потоковых данных, например автозагрузчиком или таблицей STREAMING . |
CLUSTER BY Включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. статью Использование "жидкой" кластеризации для таблиц Delta. |
PARTITIONED BY Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы. |
LOCATION Дополнительное место хранения данных таблицы. Если значение не задано, система будет по умолчанию использовать место хранения конвейера. |
COMMENT Необязательное описание таблицы. |
column_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа для столбца. |
MASK clause (общедоступная предварительная версия)Добавляет функцию маски столбца для анонимизации конфиденциальных данных. Будущие запросы для этого столбца возвращают результат вычисляемой функции вместо исходного значения столбца. Это полезно для точного управления доступом, так как функция может проверить удостоверение пользователя и членство в группах, чтобы решить, следует ли изменить значение. См . предложение "Маска столбца". |
table_constraint Необязательный информационный первичный ключ или ограничение внешнего ключа в таблице. |
TBLPROPERTIES Необязательный список свойств таблицы для таблицы. |
WITH ROW FILTER clause (общедоступная предварительная версия)Добавляет функцию фильтра строк в таблицу. Будущие запросы для этой таблицы получают подмножество строк, для которых функция оценивается как TRUE. Это полезно для точного управления доступом, так как она позволяет функции проверять удостоверения и членства в группах вызывающего пользователя, чтобы решить, следует ли фильтровать определенные строки. См . предложение ROW FILTER. |
select_statement Запрос разностной динамической таблицы, определяющий набор данных для таблицы. |
Предложение CONSTRAINT |
---|
EXPECT expectation_name Определение ограничения качества данных expectation_name . ON VIOLATION Если ограничение не определено, добавьте строки, которые нарушают ограничение целевого набора данных. |
ON VIOLATION Необязательное действие для неудачных строк: - FAIL UPDATE : Немедленно останавливает выполнение конвейера.- DROP ROW : Удаление записи и продолжение обработки. |
Изменение записи данных с помощью SQL в разностных динамических таблицах
Используйте инструкцию APPLY CHANGES INTO
для использования функций CDC Delta Live Table, как описано в следующем разделе:
CREATE OR REFRESH STREAMING TABLE table_name;
APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Вы определяете ограничения качества данных для целевого APPLY CHANGES
объекта, используя то же CONSTRAINT
предложение, что и запросы, отличныеAPPLY CHANGES
от запросов. См. статью Управление качеством данных с помощью Delta Live Tables.
Примечание.
Поведение по умолчанию для событий INSERT
и UPDATE
заключается в применении upsert к событиям CDC из источника — обновление всех записей в целевой таблице, которые совпадают с указанными ключами, или вставка новой записи, если совпадающая запись не существует в целевой таблице. Способ обработки событий DELETE
можно указать с помощью условия APPLY AS DELETE WHEN
.
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы APPLY CHANGES
необходимо также включить столбцы __START_AT
и __END_AT
с таким же типом данных, как в поле sequence_by
.
См . API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц.
Предложения |
---|
KEYS Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице. Чтобы определить сочетание столбцов, используйте разделенный запятыми список столбцов. Это предложение обязательно. |
IGNORE NULL UPDATES Разрешает прием обновлений с подмножеством целевых столбцов. Если событие CDC совпадает с существующей записью и используется IGNORE NULL UPDATES, столбцы с null сохранят существующие значения в целевом объекте. Это также относится к вложенным столбцам со значением null .Предложение не является обязательным. По умолчанию существующие столбцы перезаписываются значениями null . |
APPLY AS DELETE WHEN Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE , а не как upsert. Для обработки неупорядоченных данных удаленная запись временно сохраняется в виде отметки полного удаления в базовой разностной таблице, а в хранилище метаданных создается представление, которое отфильтровывает такие отметки. Интервал хранения можно настроить с помощью свойства таблицыpipelines.cdc.tombstoneGCThresholdInSeconds свойство table.Предложение не является обязательным. |
APPLY AS TRUNCATE WHEN Указывает, в каких случаях событие CDC необходимо обрабатывать как полную таблицу TRUNCATE . Так как это предложение активирует полное усечение целевой таблицы, его следует использовать только для конкретных вариантов использования, требующих этой функции.Предложение APPLY AS TRUNCATE WHEN поддерживается только для SCD типа 1. ScD типа 2 не поддерживает операцию усечения.Предложение не является обязательным. |
SEQUENCE BY Имя столбца, указывающего логический порядок событий CDC в исходных данных. Разностные динамические таблицы используют эту последовательность для обработки событий изменения, которые поступают неупорядоченно. Указанный столбец должен быть сортируемым типом данных. Это предложение обязательно. |
COLUMNS Указывает подмножество столбцов для включения в целевую таблицу. Вы можете сделать одно из двух: — укажите полный список столбцов, которые необходимо включить: COLUMNS (userId, name, city) — укажите список столбцов, которые следует исключить: COLUMNS * EXCEPT (operation, sequenceNum) Предложение не является обязательным. По умолчанию включаются все столбцы в целевой таблице, если не указано предложение COLUMNS . |
STORED AS Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2. Предложение не является обязательным. Значение по умолчанию — SCD типа 1. |
TRACK HISTORY ON Задает подмножество выходных столбцов для создания записей журнала при наличии изменений в указанных столбцах. Вы можете сделать одно из двух: — укажите полный список столбцов для отслеживания: COLUMNS (userId, name, city) — укажите список столбцов, которые следует исключить из отслеживания: COLUMNS * EXCEPT (operation, sequenceNum) Предложение не является обязательным. Значение по умолчанию — отслеживать журнал для всех выходных столбцов при наличии изменений, эквивалентных TRACK HISTORY ON * . |