Преобразование данных с помощью Delta Live Tables
В этой статье описывается, как использовать Delta Live Tables для объявления преобразований в наборах данных и указания способа обработки записей с помощью логики запроса. Он также содержит примеры распространенных шаблонов преобразования для создания конвейеров Delta Live Tables.
Набор данных можно определить для любого запроса, возвращающего кадр данных. Встроенные операции Apache Spark, пользовательские определяемые функции (UDF), пользовательская логика и модели MLflow можно использовать в качестве преобразований в вашем конвейере Delta Live Tables. После приема данных в ваш конвейер Delta Live Tables вы можете определить новые наборы данных по отношению к исходным источникам для создания новых потоковых tables, материализованных viewsи views.
Чтобы узнать, как эффективно выполнять обработку состояния с Delta Live Tables, см. раздел Optimize про обработку состояния в Delta Live Tables с водяными знаками.
Когда использовать views, материализованные viewsи потоковые tables
При реализации запросов конвейера выберите лучший тип набора данных, чтобы убедиться, что они эффективны и поддерживаются.
Рекомендуется использовать представление для выполнения следующих действий:
- Разорвать большой или сложный запрос, который требуется для упрощения управления запросами.
- Проверьте промежуточные результаты с помощью ожиданий.
- Уменьшите затраты на хранение и вычислительные ресурсы для результатов, которые не нужно сохранять. Так как tables материализованы, им требуются дополнительные вычислительные ресурсы и ресурсы хранилища.
Рекомендуется использовать материализованное представление, когда:
- Несколько последующих запросов используют table. Так как views вычисляются по запросу, представление вычисляется повторно при каждом запросе представления.
- Другие конвейеры, задания или запросы используют ресурс table. Так как views не материализованы, их можно использовать только в одном конвейере.
- Вы хотите просмотреть результаты запроса во время разработки. Так как tables материализованы и могут просматриваться и запрашиваться за пределами конвейера, используя tables во время разработки, можно проверить правильность вычислений. После проверки преобразуйте запросы, которые не требуют материализации в views.
Рассмотрите возможность использования потоковой передачи table, когда:
- Запрос определяется для источника данных, который постоянно или постепенно растет.
- Результаты запроса должны вычисляться постепенно.
- Конвейеру требуется высокая пропускная способность и низкая задержка.
Примечание.
Определение tables потоков всегда проводится относительно источников потоковой передачи. Вы также можете использовать источники потоковой передачи для APPLY CHANGES INTO
применения обновлений из веб-каналов CDC. См. API-интерфейсы APPLY CHANGES: упрощение захвата данных об изменениях с помощью Delta Live Tables.
Исключите tables из целевого schema
Если вы должны вычислить промежуточные tables, не предназначенные для внешнего потребления, можно предотвратить их публикацию в schema с помощью ключевого слова TEMPORARY
. Временные tables по-прежнему хранят и обрабатывают данные в соответствии с семантикой Delta Live Tables, но не должны быть доступны за пределами текущего конвейера. Временная table сохраняется в течение всего времени существования конвейера, создающего её. Используйте следующий синтаксис для объявления временного tables:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Объедините потоковые tables и материализованные views в одном конвейере
Потоковая tables наследуют гарантии обработки структурированной потоковой передачи Apache Spark и настроены для обработки запросов из источников данных, в которые могут добавляться только новые данные. where Новые строки всегда вставляются в исходные table, а не изменяются.
Примечание.
Хотя по умолчанию для потоковой передачи tables требуются источники данных только для добавления, если потоковый источник является другим table, который требует обновлений или удаления, это поведение можно изменить с помощью флага skipChangeCommits.
Распространенный шаблон потоковой передачи включает прием исходных данных для создания начальных наборов данных в конвейере. Эти начальные наборы данных обычно называются бронзовыми tables и часто выполняют простые преобразования.
Напротив, окончательные tables в конвейере, обычно называемые золотыми tables, часто требуют сложных агрегирований или считывания из целевых объектов для операции APPLY CHANGES INTO
. Поскольку эти операции по сути создают обновления, а не добавляются, они не поддерживаются в качестве входных данных для потоковой передачи tables. Эти преобразования лучше подходят для материализованных views.
Смешав потоковую передачу tables и материализованные данные views в один конвейер, можно упростить конвейер, избежать дорогостоящего повторного приема или повторной обработки необработанных данных и получить полную мощность SQL для вычисления сложных агрегатов по эффективно закодированному и отфильтрованному набору данных. Такой тип смешанной обработки показан в следующем примере:
Примечание.
В этих примерах используется автозагрузчик для загрузки файлов из облачного хранилища. Чтобы загрузить файлы с помощью Auto Loader в конвейере Unity Catalog, необходимо использовать внешние расположения . Дополнительные сведения о том, как использовать Unity Catalog с Delta Live Tables, см. в статье Использование Unity Catalog с вашими конвейерами Delta Live Tables.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return spark.readStream.table("LIVE.streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return spark.readStream.table("LIVE.streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Дополнительные сведения об использовании автозагрузчика для добавочного приема JSON-файлов из хранилища Azure.
Статические соединения потоковой передачи
Потоковые статические соединения являются хорошим выбором при денормализации непрерывного потока данных, доступных только для добавления, с главным образом статическим измерением table.
При каждом updateконвейера новые записи из потока объединяются с самой актуальной моментальной копией статического table. Если записи добавляются или обновляются в статической table после обработки соответствующих данных из потоковой table, результирующие записи не пересчитываются, если не выполнена полная refresh.
В конвейерах, настроенных для триггерного выполнения, статический table возвращает результаты по состоянию на момент начала update. В конвейерах, настроенных для непрерывного выполнения, последняя версия статического table запрашивается каждый раз, когда table обрабатывает update.
Ниже приведен пример потока «stream-static» join:
Python
@dlt.table
def customer_sales():
return spark.readStream.table("LIVE.sales").join(spark.readStream.table("LIVE.customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Эффективное вычисление статистических выражений
Вы можете использовать потоковую обработку tables для инкрементального вычисления простых дискретных агрегатов, таких как количество, минимум, максимум или сумма, а также алгебраических агрегатов, таких как среднее или стандартное отклонение. Databricks рекомендует добавочную агрегирование для запросов с ограниченным числом групп, таких как запрос с предложением GROUP BY country
. С каждым updateсчитываются только новые входные данные.
Дополнительные сведения о написании запросов Delta Live Tables, выполняющих инкрементные агрегирования, см. в статье Выполнение агрегирования с водяными знаками.
Использование моделей MLflow в конвейере Delta Live Tables
Примечание.
Чтобы использовать модели MLflow в конвейере с поддержкой Unity Catalog, необходимо настроить конвейер для использования канала preview
. Чтобы использовать current
канал, необходимо настроить конвейер для публикации в хранилище метаданных Hive.
Вы можете использовать модели, обученные с помощью MLflow, в потоках работ Delta Live Tables. Модели MLflow рассматриваются как преобразования в Azure Databricks, что означает, что они действуют на входных данных Spark DataFrame и возвращают результаты в качестве кадра данных Spark. Поскольку Delta Live Tables определяет наборы данных для DataFrame, вы можете преобразовать рабочие нагрузки Apache Spark, использующие MLflow, в Delta Live Tables, написав всего лишь несколько строк кода. Дополнительные сведения о MLflow см. в MLflow для жизненного цикла генеративных ИИ приложений и моделей.
Если у вас уже есть записная книжка Python, вызывающая модель MLflow, вы можете адаптировать этот код к Delta Live Tables с помощью декоратора @dlt.table
и обеспечения определения функций для возврата результатов преобразования. Delta Live Tables по умолчанию не устанавливает MLflow, поэтому убедитесь, что вы установили библиотеки MLflow с помощью %pip install mlflow
и импортировали mlflow
и dlt
в начале вашей записной книжки. Общие сведения о синтаксисе Delta Live Tables см. в статье Разработка кода конвейера с помощью Python.
Чтобы использовать модели MLflow в Delta Live Tables, выполните следующие действия:
- Получите идентификатор выполнения и имя модели MLflow. Идентификатор выполнения и имя модели используются для создания URI модели MLflow.
- Используйте URI для определения UDF Spark для загрузки модели MLflow.
- Вызовите UDF в определениях table для использования модели MLflow.
В следующем примере показан базовый синтаксис для этого шаблона:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return spark.read.table(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
В качестве полного примера следующий код определяет UDF Spark с именем loaded_model_udf
, который загружает модель MLflow, обученную по данным риска кредита. Данные columns, используемые для прогнозирования, передаются в качестве аргумента в UDF.
table
loan_risk_predictions
вычисляет прогнозы для каждой строки в loan_risk_input_data
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return spark.read.table("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Сохранение удалений или обновлений вручную
Delta Live Tables позволяет вручную удалять или update записи из table и выполнять операцию refresh для повторной компиляции tablesниже.
По умолчанию Delta Live Tables перекомпьютерирует результаты table на основе входных данных при каждом обновлении конвейера, поэтому необходимо убедиться, что удаленная запись не перезагрузится из исходных данных. Установка свойства pipelines.reset.allowed
table на false
предотвращает обновление table, но не предотвращает инкрементальную запись в tables или поступление новых данных в table.
На следующей схеме показан пример использования двух потоковых tables:
-
raw_user_table
прием необработанных данных пользователя из источника. -
bmi_table
последовательно вычисляет оценки индекса массы тела (ИМТ), используя значения веса и роста изraw_user_table
.
Вы хотите вручную удалить или update записи пользователей из raw_user_table
и перекомпьютировать bmi_table
.
В следующем коде показано, как настроить свойство pipelines.reset.allowed
table на false
, чтобы отключить полные refresh для raw_user_table
, чтобы предполагаемые изменения сохранялись с течением времени, но нижестоящий tables выполняется повторно при запуске конвейера update:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);