Справочник по языку для разностных динамических таблиц Python
В этой статье содержатся сведения о интерфейсе программирования Python для Delta Live Tables.
Дополнительные сведения об API SQL см. в Справочнике по языку SQL для разностных динамических таблиц.
Дополнительные сведения о настройке автозагрузчика см. в разделе "Что такое автозагрузчик?".
Перед началом работы
При реализации конвейеров с помощью интерфейса Python Delta Live Tables следует учитывать следующее:
- Так как Python
table()
иview()
функции вызываются несколько раз во время планирования и выполнения обновления конвейера, не включают код в одну из этих функций, которые могут иметь побочные эффекты (например, код, который изменяет данные или отправляет сообщение электронной почты). Чтобы избежать непредвиденного поведения, функции Python, определяющие наборы данных, должны содержать только код, необходимый для определения таблицы или представления. - Для выполнения таких операций, как отправка сообщений электронной почты или интеграция с внешней службой мониторинга, особенно в функциях, определяющих наборы данных, используйте перехватчики событий. Реализация этих операций в функциях, определяющих наборы данных, приведет к неожиданному поведению.
- Python
table
иview
функции должны возвращать кадр данных. Некоторые функции, работающие с кадрами данных, не возвращают кадры данных и не должны использоваться. Эти операции включают такие функции, какcollect()
,count()
,toPandas()
,save()
иsaveAsTable()
. Так как преобразования DataFrame выполняются после разрешения полного графа потока данных, при использовании таких операций могут быть непреднамеренные побочные эффекты.
dlt
Импорт модуля Python
Функции Python разностных динамических таблиц определяются в модуле dlt
. Конвейеры, реализованные с помощью API Python, должны импортировать этот модуль:
import dlt
Создание материализованного представления или потоковой таблицы Разностных динамических таблиц
В Python разностные динамические таблицы определяют, следует ли обновлять набор данных в виде материализованного представления или потоковой таблицы на основе определяющего запроса. Декоратор @table
можно использовать для определения материализованных представлений и потоковых таблиц.
Чтобы определить материализованное представление в Python, примените @table
к запросу, который выполняет статическое чтение в источнике данных. Чтобы определить таблицу потоковой передачи, примените @table
к запросу, который выполняет потоковое чтение в источнике данных или используйте функцию create_streaming_table(). Оба типа набора данных имеют одну и ту же спецификацию синтаксиса, как показано ниже.
Примечание.
Чтобы использовать cluster_by
аргумент для включения кластеризации жидкости, конвейер должен быть настроен для использования канала предварительной версии.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Создание представления разностных динамических таблиц
Чтобы определить представление в Python, примените декоратор @view
. @table
Как и декоратор, можно использовать представления в разностных динамических таблицах для статических или потоковых наборов данных. Ниже приведен синтаксис для определения представлений с помощью Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Пример. Определение таблиц и представлений
Чтобы определить таблицу или представление в Python, примените @dlt.view
к функции или @dlt.table
декоратору. Для назначения имени таблицы или представления можно использовать имя функции или параметр name
. В следующем примере определяются два различных набора данных: представление с именем taxi_raw
, которое принимает JSON-файл в качестве источника входных данных, и таблица с именем filtered_data
, принимающая представление taxi_raw
в качестве входных данных:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Пример. Доступ к набору данных, определенному в том же конвейере
Примечание.
dlt.read()
Хотя и dlt.read_stream()
функции по-прежнему доступны и полностью поддерживаются интерфейсом Python Delta Live Tables, Databricks рекомендует всегда использовать spark.read.table()
и spark.readStream.table()
функции из-за следующих действий:
- Функции
spark
поддерживают чтение внутренних и внешних наборов данных, включая наборы данных во внешнем хранилище или определенные в других конвейерах. Функцииdlt
поддерживают только чтение внутренних наборов данных. - Функции
spark
поддерживают указание параметров, таких какskipChangeCommits
операции чтения. Указание параметров не поддерживается функциямиdlt
.
Чтобы получить доступ к набору данных, определенному в том же конвейере, используйте spark.read.table()
или spark.readStream.table()
функции, указав LIVE
ключевое слово в имя набора данных:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Пример. Чтение из таблицы, зарегистрированной в хранилище метаданных
Чтобы считывать данные из таблицы, зарегистрированной в хранилище метаданных Hive, в аргументе функции опустите LIVE
ключевое слово и при необходимости укажите имя таблицы с именем базы данных:
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Пример чтения из таблицы каталога Unity см . в конвейере каталога Unity.
Пример. Доступ к набору данных с помощью spark.sql
Можно также вернуть набор данных с помощью выражения spark.sql
в функции запроса. Для чтения из внутреннего набора данных, добавьте в начало LIVE.
к имени набора данных:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Создание таблицы для использования в качестве цели операций потоковой передачи
create_streaming_table()
Используйте функцию для создания целевой таблицы для выходных данных записей операций потоковой передачи, включая apply_changes(), apply_changes_from_snapshot() и @append_flow выходные записи.
Примечание.
create_streaming_live_table()
Не create_target_table()
рекомендуется использовать функции. В Databricks рекомендуется обновить существующий код для использования функции create_streaming_table()
.
Примечание.
Чтобы использовать cluster_by
аргумент для включения кластеризации жидкости, конвейер должен быть настроен для использования канала предварительной версии.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Аргументы |
---|
name Тип: str Имя таблицы. Этот параметр является обязательным. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств таблицы для таблицы. |
partition_cols Тип: array Необязательный список из одного или нескольких столбцов, используемых для секционирования таблицы. |
cluster_by Тип: array При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. статью Использование "жидкой" кластеризации для таблиц Delta. |
path Тип: str Дополнительное место хранения данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера. |
schema Тип: str или StructType .Необязательное определение схемы для таблицы. Схемы можно определить как строку DDL SQL или с помощью Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Тип: dict Необязательные ограничения качества данных для таблицы. См . несколько ожиданий. |
row_filter (общедоступная предварительная версия)Тип: str Необязательное предложение фильтра строк для таблицы. См. статью "Публикация таблиц" с фильтрами строк и масками столбцов. |
Управление материализацией таблиц
Таблицы также обеспечивают дополнительный контроль над их материализацией:
- Укажите, как секционируются таблицы с помощью
partition_cols
. Секционирование можно использовать для ускорения запросов. - Свойства таблицы можно задать при определении представления или таблицы. См . свойства таблицы Delta Live Table.
- Задайте место хранения данных таблицы с помощью параметра
path
. По умолчанию данные таблицы хранятся в расположении хранилища конвейера, еслиpath
не задано. - Созданные столбцы можно использовать в определении схемы. См . пример. Указание столбцов схемы и секционирования.
Примечание.
Для таблиц меньше 1 ТБ в размере Databricks рекомендует разрешить delta Live Tables управлять данными организации данных. Не следует указывать столбцы секций, если вы не ожидаете, что таблица будет расти за пределами терабайта.
Пример. Указание столбцов схемы и секционирования
При необходимости можно указать схему таблицы с помощью Python StructType
или строки SQL DDL. При указании в строке DDL определение может включать созданные столбцы.
В следующем примере создается таблица sales
с схемой, указанной с помощью Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
В следующем примере схема таблицы с помощью строки DDL определяет созданный столбец и определяет столбец секционирования:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
По умолчанию разностные динамические таблицы выводят схему из определения table
, если схема не указана.
Настройка таблицы потоковой передачи для пропуска изменений в исходной потоковой таблице
Примечание.
- Флаг
skipChangeCommits
работает только сspark.readStream
помощьюoption()
функции. Этот флаг нельзя использовать вdlt.read_stream()
функции. - Нельзя использовать
skipChangeCommits
флаг, если исходная потоковая таблица определена как цель функции apply_changes().
По умолчанию для потоковых таблиц требуются источники только для добавления. Если в таблице потоковой передачи используется другая потоковая таблица в качестве источника, а исходная потоковая таблица требует обновления или удаления, например GDPR "право быть забытым", флаг можно задать при чтении исходной таблицы потоковой передачи, skipChangeCommits
чтобы игнорировать эти изменения. Дополнительные сведения об этом флаге см. в разделе "Игнорировать обновления и удаления".
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Пример. Определение ограничений таблицы
Внимание
Ограничения таблиц находятся в общедоступной предварительной версии.
При указании схемы можно определить первичные и внешние ключи. Ограничения являются информационными и не применяются. См. предложение CONSTRAINT в справочнике по языку SQL.
В следующем примере определяется таблица с ограничением первичного и внешнего ключа:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Пример. Определение фильтра строк и маски столбцов
Внимание
Фильтры строк и маски столбцов находятся в общедоступной предварительной версии.
Чтобы создать материализованное представление или таблицу потоковой передачи с фильтром строк и маской столбцов, используйте предложение ROW FILTER и предложение MASK. В следующем примере показано, как определить материализованное представление и таблицу Потоковой передачи с фильтром строк и маской столбца:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Дополнительные сведения о фильтрах строк и масках столбцов см. в статье "Публикация таблиц" с фильтрами строк и масками столбцов.
Свойства разностных динамических таблиц Python
В следующих таблицах описаны параметры и свойства, которые можно указать при определении таблиц и представлений с помощью разностных динамических таблиц:
Примечание.
Чтобы использовать cluster_by
аргумент для включения кластеризации жидкости, конвейер должен быть настроен для использования канала предварительной версии.
@table или @view |
---|
name Тип: str Необязательное имя таблицы или представления. Если значение не определено, имя функции используется в качестве имени таблицы или представления. |
comment Тип: str Необязательное описание таблицы. |
spark_conf Тип: dict Необязательный список конфигураций Spark для выполнения этого запроса. |
table_properties Тип: dict Необязательный список свойств таблицы для таблицы. |
path Тип: str Дополнительное место хранения данных таблицы. Если не задано, система по умолчанию использует расположение хранилища конвейера. |
partition_cols Тип: a collection of str Необязательная коллекция, например list один или несколько столбцов, используемых для секционирования таблицы. |
cluster_by Тип: array При необходимости включите кластеризацию жидкости в таблице и определите столбцы, используемые в качестве ключей кластеризации. См. статью Использование "жидкой" кластеризации для таблиц Delta. |
schema Тип: str или StructType .Необязательное определение схемы для таблицы. Схемы можно определить как строку DDL SQL или Python StructType . |
temporary Тип: bool Создайте таблицу, но не публикуйте метаданные для таблицы. Ключевое temporary слово указывает Delta Live Table создать таблицу, доступную конвейеру, но не должен быть доступ к ней за пределами конвейера. Чтобы сократить время обработки, временная таблица сохраняется в течение всего времени существования конвейера, создающего его, а не только одного обновления.Значение по умолчанию —False. |
row_filter (общедоступная предварительная версия)Тип: str Необязательное предложение фильтра строк для таблицы. См. статью "Публикация таблиц" с фильтрами строк и масками столбцов. |
Определение таблицы или представления |
---|
def <function-name>() Функция Python, определяющая набор данных. name Если параметр не задан, то <function-name> используется в качестве имени целевого набора данных. |
query Инструкция Spark SQL, возвращающая набор данных Spark или Koalas DataFrame. Используйте dlt.read() или spark.read.table() для выполнения полного считывания из набора данных, определенного в том же конвейере. Чтобы прочитать внешний набор данных, используйте функцию spark.read.table() . Нельзя использовать dlt.read() для чтения внешних наборов данных. Так как spark.read.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо dlt.read() функции.При использовании spark.read.table() функции для чтения из набора данных, определенного в том же конвейере, добавьте LIVE ключевое слово в имя набора данных в аргументе функции. Например, для чтения из набора данных с именем customers :spark.read.table("LIVE.customers") Можно также использовать функцию spark.read.table() для чтения из таблицы, зарегистрированной в хранилище метаданных, пропустив ключевое слово LIVE , и при необходимости дополнить имя таблицы именем базы данных:spark.read.table("sales.customers") Использование dlt.read_stream() или spark.readStream.table() выполнение потокового чтения из набора данных, определенного в том же конвейере. Для выполнения потоковой передачи из внешнего набора данных используйтеspark.readStream.table() функция. Так как spark.readStream.table() можно использовать для чтения внутренних наборов данных, наборов данных, определенных вне текущего конвейера, и позволяет указать параметры чтения данных, Databricks рекомендует использовать его вместо dlt.read_stream() функции.Чтобы определить запрос в функции Delta Live Table table с помощью синтаксиса SQL, используйте функцию spark.sql . См . пример. Доступ к набору данных с помощью spark.sql. Чтобы определить запрос в функции Delta Live Table table с помощью Python, используйте синтаксис PySpark . |
Ожидания |
---|
@expect("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, включите строку в целевой набор данных. |
@expect_or_drop("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, удалите строку из целевого набора данных. |
@expect_or_fail("description", "constraint") Объявите ограничение качества данных, определяемое с помощью description . Если строка нарушает ожидание, немедленно прервите выполнение. |
@expect_all(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, включите строку в целевой набор данных. |
@expect_all_or_drop(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, удалите строку из целевого набора данных. |
@expect_all_or_fail(expectations) Объявите одно или несколько ограничений качества данных. expectations — это словарь Python, в котором ключ является описанием ожидания, а значение — ограничением ожидания. Если строка нарушает любое ожидание, немедленно прервите выполнение. |
Изменение записи данных из канала изменений с помощью Python в разностных динамических таблицах
apply_changes()
Используйте функцию в API Python, чтобы использовать функцию записи измененных данных в разностных динамических таблицах (CDC) для обработки исходных данных из веб-канала изменений (CDF).
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой apply_changes()
таблицы необходимо включить __START_AT
столбцы с тем же типом данных, что sequence_by
и __END_AT
поля.
Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Примечание.
Для APPLY CHANGES
обработки поведение по умолчанию для INSERT
и UPDATE
событий требуется обновить события CDC из источника: обновить все строки в целевой таблице, соответствующие указанным ключам, или вставить новую строку, если соответствующая запись не существует в целевой таблице. Способ обработки событий DELETE
можно указать с помощью условия APPLY AS DELETE WHEN
.
Дополнительные сведения об обработке CDC с помощью канала изменений см. в разделе API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц. Пример использования apply_changes()
функции см. в статье Example: SCD type 1 and SCD type 2 processing with CDF source data.
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой apply_changes
таблицы необходимо включить __START_AT
столбцы с тем же типом данных, что sequence_by
и __END_AT
поле.
См . API APPLY CHANGES: упрощение отслеживания изменений с помощью разностных динамических таблиц.
Аргументы |
---|
target Тип: str Имя обновляемой таблицы. Функцию create_streaming_table() можно использовать для создания целевой таблицы перед выполнением apply_changes() функции.Этот параметр является обязательным. |
source Тип: str Источник данных с записями CDC. Этот параметр является обязательным. |
keys Тип: list Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице. Вы можете указать одно из следующего: — список строк: ["userId", "orderId"] — список функций Spark SQL col() : [col("userId"), col("orderId"] Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Этот параметр является обязательным. |
sequence_by Тип: str или col() .Имя столбца, указывающего логический порядок событий CDC в исходных данных. Разностные динамические таблицы используют эту последовательность для обработки событий изменения, которые поступают неупорядоченно. Вы можете указать одно из следующего: — строка: "sequenceNum" — Функция Spark SQL col() : col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Указанный столбец должен быть сортируемым типом данных. Этот параметр является обязательным. |
ignore_null_updates Тип: bool Разрешает прием обновлений с подмножеством целевых столбцов. Если событие CDC соответствует существующей строке и ignore_null_updates является True столбцами с null сохранением существующих значений в целевом объекте. Это также относится к вложенным столбцам со значением null . Если ignore_null_updates это False так, существующие значения перезаписываются со значениями null .Это необязательный параметр. Значение по умолчанию — False . |
apply_as_deletes Тип: str или expr() .Указывает, в каких случаях событие CDC необходимо обрабатывать как DELETE , а не как upsert. Для обработки неупорядоченных данных удаленная запись временно сохраняется в виде отметки полного удаления в базовой разностной таблице, а в хранилище метаданных создается представление, которое отфильтровывает такие отметки. Интервал хранения можно настроить с помощью свойства таблицыpipelines.cdc.tombstoneGCThresholdInSeconds свойство table.Вы можете указать одно из следующего: — строка: "Operation = 'DELETE'" — Функция Spark SQL expr() : expr("Operation = 'DELETE'") Это необязательный параметр. |
apply_as_truncates Тип: str или expr() .Указывает, в каких случаях событие CDC необходимо обрабатывать как полную таблицу TRUNCATE . Так как это предложение активирует полное усечение целевой таблицы, его следует использовать только для конкретных вариантов использования, требующих этой функции.Параметр apply_as_truncates поддерживается только для SCD типа 1. ScD type 2 не поддерживает операции усечения.Вы можете указать одно из следующего: — строка: "Operation = 'TRUNCATE'" — Функция Spark SQL expr() : expr("Operation = 'TRUNCATE'") Это необязательный параметр. |
column_list except_column_list Тип: list Подмножество столбцов для включения в целевую таблицу. Используйте column_list для указания полного списка включаемых столбцов. Используйте except_column_list для указания исключаемых столбцов. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Это необязательный параметр. По умолчанию включаются все столбцы в целевой таблице, если в функцию не передается аргумент column_list или except_column_list . |
stored_as_scd_type Тип: str или int .Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2.Предложение не является обязательным. Значение по умолчанию — SCD типа 1. |
track_history_column_list track_history_except_column_list Тип: list Подмножество выходных столбцов для отслеживания журнала в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Использованиеtrack_history_except_column_list , чтобы указать столбцы, которые следует исключить из отслеживания. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Это необязательный параметр. Значение по умолчанию — включать все столбцы в целевую таблицу, если нет или нет track_history_column_list .track_history_except_column_list аргумент передается функции. |
Изменение записи данных из моментальных снимков базы данных с помощью Python в разностных динамических таблицах
Внимание
APPLY CHANGES FROM SNAPSHOT
API находится в общедоступной предварительной версии.
apply_changes_from_snapshot()
Используйте функцию в API Python, чтобы использовать функцию "Разностные динамические таблицы" для обработки исходных данных из моментальных снимков базы данных.
Внимание
Для применения изменений необходимо объявить целевую таблицу потоковой передачи. При необходимости можно указать схему для целевой таблицы. При указании схемы целевой таблицы apply_changes_from_snapshot()
необходимо также включить столбцы __START_AT
и __END_AT
с таким же типом данных, как в поле sequence_by
.
Чтобы создать требуемую целевую таблицу, можно использовать функцию create_streaming_table() в интерфейсе Python delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Примечание.
Для APPLY CHANGES FROM SNAPSHOT
обработки поведение по умолчанию заключается в вставке новой строки, если соответствующая запись с теми же ключами не существует в целевом объекте. Если соответствующая запись существует, она обновляется только в том случае, если все значения в строке изменились. Строки с ключами, присутствующих в целевом объекте, но больше не присутствуют в источнике, удаляются.
Дополнительные сведения об обработке CDC с моментальными снимками см. в разделе API APPLY CHANGES: Упрощение отслеживания изменений с помощью разностных динамических таблиц. Примеры использования apply_changes_from_snapshot()
функции см. в примерах приема периодических моментальных снимков и исторических примеров приема моментальных снимков.
Аргументы |
---|
target Тип: str Имя обновляемой таблицы. Перед запуском функции можно использовать функцию create_streaming_table() для создания целевой apply_changes() таблицы.Этот параметр является обязательным. |
source Тип: str или lambda function .Имя таблицы или представления для моментального снимка или лямбда-функции Python, которая возвращает кадр моментальных снимков для обработки и версии моментального снимка. См. раздел "Реализация исходного аргумента". Этот параметр является обязательным. |
keys Тип: list Столбец или сочетание столбцов, которые однозначно идентифицируют запись в исходных данных. Используется для определения того, какие события CDC применяются к конкретным записям в целевой таблице. Вы можете указать одно из следующего: — список строк: ["userId", "orderId"] — список функций Spark SQL col() : [col("userId"), col("orderId"] Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Этот параметр является обязательным. |
stored_as_scd_type Тип: str или int .Определяет, следует ли хранить записи в виде SCD типа 1 или SCD типа 2. Укажите значение 1 для SCD типа 1 или 2 для SCD типа 2.Предложение не является обязательным. Значение по умолчанию — SCD типа 1. |
track_history_column_list track_history_except_column_list Тип: list Подмножество выходных столбцов для отслеживания журнала в целевой таблице. Используется track_history_column_list для указания полного списка столбцов для отслеживания. Использованиеtrack_history_except_column_list , чтобы указать столбцы, которые следует исключить из отслеживания. Вы можете объявить любое из этих значений в виде списка строк или в виде функций col() Spark SQL:- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Аргументы для функций col() не могут включать квалификаторы. Например, можно использовать col(userId) , но нельзя использовать col(source.userId) .Это необязательный параметр. Значение по умолчанию — включать все столбцы в целевую таблицу, если нет или нет track_history_column_list .track_history_except_column_list аргумент передается функции. |
Реализация аргумента source
Функция apply_changes_from_snapshot()
включает source
аргумент. Для обработки исторических моментальных снимков аргумент, как ожидается, будет лямбда-функцией Python, source
которая возвращает два значения apply_changes_from_snapshot()
функции: кадр данных Python, содержащий данные моментального снимка для обработки и версии моментального снимка.
Ниже приведена подпись лямбда-функции:
lambda Any => Optional[(DataFrame, Any)]
- Аргумент лямбда-функции является последней обработанной версией моментального снимка.
- Возвращаемое значение лямбда-функции —
None
это кортеж двух значений: первое значение кортежа — это кадр данных, содержащий моментальный снимок для обработки. Вторым значением кортежа является версия моментального снимка, представляющая логический порядок моментального снимка.
Пример, реализующий и вызывающий лямбда-функцию:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Среда выполнения Delta Live Tables выполняет следующие действия при каждом запуске конвейера, содержащего apply_changes_from_snapshot()
функцию:
next_snapshot_and_version
Запускает функцию, чтобы загрузить следующий кадр данных моментального снимка и соответствующую версию моментального снимка.- Если кадр данных не возвращается, выполнение завершается и обновление конвейера помечается как завершенное.
- Обнаруживает изменения в новом моментальном снимке и добавочно применяет их к целевой таблице.
- Возвращается к шагу 1, чтобы загрузить следующий моментальный снимок и его версию.
Ограничения
Интерфейс Python для разностных динамических таблиц имеет следующее ограничение:
Функция pivot()
не поддерживается. Для pivot
выполнения операции в Spark требуется загрузка входных данных для вычисления выходной схемы. Эта возможность не поддерживается в разностных динамических таблицах.