Поделиться через


Optimize состояние обработки в Delta Live Tables с водяными знаками

Для эффективного управления данными, хранимыми в состоянии, используйте водяные знаки при обработке потоков с отслеживанием состояния в Delta Live Tables, включая агрегации, объединения и дедупликацию. В этой статье описывается, как использовать водяные знаки в ваших запросах Delta Live Tables и приведены примеры рекомендованных операций.

Примечание.

Чтобы убедиться, что запросы, которые выполняют агрегирование, обрабатываются постепенно и не полностью перекомпилируются с каждым update, необходимо использовать водяные знаки.

Что такое watermark?

В потоковой обработке функция watermark в Apache Spark может определять пороговое значение на основе времени для обработки данных при выполнении операций с отслеживанием состояния, таких как агрегации. Поступающие данные обрабатываются до тех пор, пока не будет достигнуто пороговое значение, после чего закрывается время window, определенное этим порогом. Водяные знаки можно использовать для предотвращения проблем во время обработки запросов, главным образом при обработке больших наборов данных или длительной обработки. Эти проблемы могут включать высокую задержку при создании результатов и даже ошибок вне памяти (OOM) из-за объема данных, хранящихся в состоянии во время обработки. Поскольку потоковые данные по своей природе неупорядочены, водяные знаки также поддерживают правильное вычисление таких операций, как агрегации по времени иwindow.

Дополнительные сведения об использовании подложек в потоковой обработке см. в разделе "Подложка" в Apache Spark Структурированная потоковая передача и применение подложки для управления порогами обработки данных.

Как определить watermark?

Вы определяете watermark, указав поле метки времени и значение, определяющее временной порог для поступления данных с задержками . Данные считаются поздними, если они поступают после определенного порогового значения времени. Например, если пороговое значение определяется как 10 минут, записи, поступающие после 10-минутного порогового значения, могут быть удалены.

Так как записи, поступающие после определенного порогового значения, могут быть удалены, важно выбрать пороговое значение, соответствующее требованиям задержки и правильности. Выбор меньшего порогового значения приводит к тому, что записи создаются раньше, но также означает, что поздние записи, скорее всего, будут удалены. Более большое пороговое значение означает более длительное ожидание, но, возможно, больше полноты данных. Из-за большего размера состояния также может потребоваться дополнительное вычислительное значение. Так как пороговое значение зависит от требований к данным и обработке, тестирование и мониторинг обработки важно определить оптимальное пороговое значение.

Функция withWatermark() в Python используется для определения watermark. В SQL используйте предложение WATERMARK для определения watermark:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Использовать водяные знаки с потоковыми соединениями

Для потоковых соединений необходимо определить watermark с обеих сторон от join и задать условие интервала времени. Так как каждый источник join имеет неполное представление о данных, условие временного интервала требуется для того, чтобы сообщить стриминговому движку, когда дальнейшие совпадения невозможны. Предложение интервала времени должно использовать те же поля, которые используются для определения подложек.

Так как в каждом потоке требуются разные пороговые значения для подложек, потоки не должны иметь одинаковые пороговые значения. Чтобы избежать отсутствия данных, модуль потоковой передачи поддерживает одну глобальную watermark на основе самого медленного потока.

В следующем примере присоединяется поток рекламных впечатлений и поток пользователей щелкает рекламу. В этом примере щелчк должен происходить в течение 3 минут от впечатления. После прохождения 3-минутного интервала времени строки из состояния, которое больше не может быть сопоставлено, удаляются.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Выполнение оконных агрегатов с подложками

Распространенная операция с отслеживанием состояния потоковых данных — это агрегирование с окном. Агрегации с окнами похожи на сгруппированные агрегации, за исключением того, что агрегаты values возвращаются для set строк, которые являются частью определенного window.

window можно определить как определенную длину, а операцию агрегирования можно выполнить во всех строках, входящих в эту window. Потоковая передача Spark поддерживает три типа окон:

  • Переворачивающиеся (фиксированные) окна: ряд фиксированных размеров, не перекрывающихся и смежных интервалов времени. Входная запись принадлежит только одной window.
  • Скользящие окна: как и переворачивающиеся окна, скользящие окна имеют фиксированный размер, но окна могут перекрываться, и запись может попасть в несколько окон.

Когда данные поступают после конца периода window плюс длина watermark, новые данные для windowне принимаются, результат агрегирования выдаётся, и состояние для window сбрасывается.

В следующем примере вычисляется сумма впечатлений каждые 5 минут с помощью фиксированной window. В этом примере предложение select использует псевдоним impressions_window, а сам window определяется как часть предложения GROUP BY. window должен основываться на той же метке времени column, что и watermark, clickTimestampcolumn в этом примере.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Аналогичный пример в Python для вычисления прибыли в течение часа фиксированного окна:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Дедупликация записей потоковой передачи

Структурированная потоковая передача имеет точно один раз гарантии обработки, но не автоматически отменяет повторяющиеся записи из источников данных. Например, так как многие очереди сообщений имеют по крайней мере один раз гарантии, при чтении из одной из этих очередей сообщений следует ожидать повторяющиеся записи. Функцию dropDuplicatesWithinWatermark() можно использовать для отмены повторяющихся записей в любом указанном поле, удаляя дубликаты из потока, даже если некоторые поля отличаются (например, время события или время прибытия). Чтобы использовать функцию dropDuplicatesWithinWatermark(), необходимо указать watermark. Все повторяющиеся данные, поступающие в диапазон времени, указанный watermark, удаляются.

Упорядоченные данные важны, так как данные, которые выходят из порядка, приводят к неправильному перепрыгиванию значения watermark. Затем, когда старые данные поступают, считается поздним и удаленным. Используйте параметр withEventTimeOrder для последовательной обработки начального моментального снимка согласно метке времени, указанной в watermark. Параметр withEventTimeOrder можно объявить в коде, определяющем набор данных или в параметрах конвейера с помощью spark.databricks.delta.withEventTimeOrder.enabled. Рассмотрим пример.

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Примечание.

Этот withEventTimeOrder параметр поддерживается только в Python.

В следующем примере данные обрабатываются в порядке clickTimestamp, а записи, поступающие в течение 5 секунд друг от друга, содержащие повторяющиеся userId и clickAdIdcolumns, удаляются.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimize конфигурации конвейера для обработки с отслеживанием состояния

Чтобы предотвратить проблемы с рабочей средой и чрезмерную задержку, Databricks рекомендует включить управление состоянием на основе RocksDB для обработки потоков с отслеживанием состояния, особенно если обработка требует сохранения большого количества промежуточного состояния.

Конвейеры без использования автоматически управляют конфигурациями хранилища состояний.

Вы можете включить управление состоянием на основе RocksDB, задав следующую конфигурацию перед развертыванием конвейера:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Дополнительные сведения о хранилище состояний RocksDB, включая рекомендации по настройке Для RocksDB, см. в статье Настройка хранилища состояний RocksDB в Azure Databricks.