Поделиться через


Структурированные контрольные точки потоковой передачи

Контрольные точки и журналы перед записью работают вместе, чтобы обеспечить гарантии обработки для структурированных рабочих нагрузок потоковой передачи. Контрольная точка отслеживает сведения, определяющие запрос, включая сведения о состоянии и обработанные записи. При удалении файлов в каталоге контрольных точек или изменении в новом расположении контрольной точки следующий запуск запроса начинается свежим.

Каждый запрос должен иметь свое расположение контрольной точки. Несколько запросов никогда не должны совместно использовать одно расположение.

Включение контрольных точек для запросов структурированной потоковой передачи

Перед выполнением потокового запроса необходимо указать checkpointLocation этот параметр, как показано в следующем примере:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Примечание.

Некоторые приемники, такие как выходные данные display() для записных книжек и memory приемника, автоматически создают временное расположение контрольной точки, если этот параметр не указан. Эти временные расположения контрольных точек не гарантируют отказоустойчивость или гарантии согласованности данных и могут неправильно очиститься. Databricks рекомендует всегда указывать расположение контрольной точки для этих приемников.

Восстановление после изменений в структурированном запросе потоковой передачи

Существуют ограничения на то, какие изменения в запросе потоковой передачи разрешены между перезапусками из одного расположения контрольной точки. Ниже приведены некоторые изменения, которые либо не разрешены, либо эффект изменения не определен. Для всех из них:

  • Термин допустимый означает, что вы можете выполнить указанное изменение, но независимо от того, правильно ли определена семантика его воздействия, зависит от запроса и изменений.
  • Термин недопустимый означает, что не следует выполнять указанное изменение, так как перезапущенный запрос, скорее всего, завершится сбоем с непредсказуемыми ошибками.
  • sdf представляет DataFrame/Dataset потоковой передачи, созданный с помощью sparkSession.readStream.

Типы изменений в запросах структурированной потоковой передачи

  • Изменения в числе или типе источников входных данных (то есть другой источник) — это недопустимо.
  • Изменения в параметрах источников входных данных — разрешено ли это, и определяется ли семантика изменения четко, зависит от источника и запроса. Далее мы рассмотрим несколько примеров.
    • Добавление, удаление и изменение пределов скорости разрешено:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      до

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Изменения в статьях и файлах с подпиской обычно не разрешены, так как результаты непредсказуемы: spark.readStream.format("kafka").option("subscribe", "article") на spark.readStream.format("kafka").option("subscribe", "newarticle").

  • Изменения в интервале триггера: можно изменять триггеры между добавочными пакетами и интервалами времени. См. раздел "Изменение интервалов триггеров между запусками".
  • Изменения в типе приемника выходных данных — разрешены изменения между несколькими конкретными сочетаниями приемников. Это необходимо проверить отдельно для каждого случая. Далее мы рассмотрим несколько примеров.
    • Менять приемник файлов на приемник Kafka разрешено. Kafka увидит только новые данные.
    • Менять приемник Kafka на приемник файлов запрещено.
    • Менять приемник Kafka на foreach и наоборот разрешено.
  • Изменения в параметрах приемников выходных данных — разрешено ли это, и определяется ли семантика изменения четко, зависит от приемника и запроса. Далее мы рассмотрим несколько примеров.
    • Изменения в выходном каталоге приемника файлов запрещены: sdf.writeStream.format("parquet").option("path", "/somePath") на sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Изменения в выходном разделе разрешены: sdf.writeStream.format("kafka").option("topic", "topic1")sdf.writeStream.format("kafka").option("topic", "topic2")
    • Изменения в определенном пользователем приемнике foreach (т. е. в коде ForeachWriter) разрешены, но семантика изменения зависит от кода.
  • Изменения в операциях, подобных проекции, фильтру или сопоставлению. Некоторые варианты разрешены. Например:
    • Добавление и удаление фильтров разрешено: sdf.selectExpr("a") на sdf.where(...).selectExpr("a").filter(...).
    • Изменения в проекциях с одной и той же выходной схемой разрешены: sdf.selectExpr("stringColumn AS json").writeStream на sdf.select(to_json(...).as("json")).writeStream.
    • Изменения в проекциях с различной выходной схемой условно разрешены: sdf.selectExpr("a").writeStream на sdf.selectExpr("b").writeStream допускается только в том случае, если приемник выходных данных допускает изменение схемы с "a" на "b".
  • Изменения в операциях с отслеживанием состояния — некоторые операции в запросах потоковой передачи должны поддерживать данные состояния, чтобы постоянно обновлять результат. Структурированная потоковая передача автоматически создает контрольные точки для данных состояния в отказоустойчивом хранилище (например, DBFS, хранилище BLOB-объектов Azure) и восстанавливает его после перезагрузки. Однако при этом предполагается, что схема данных состояния не меняется после перезапуска. Это означает, что любые изменения (т. е. добавления, удаления или изменения схемы) в операциях запроса потоковой передачи с отслеживанием состояния не допускаются между перезапусками. Ниже приведен список операций с отслеживанием состояния, схема которых не должна быть изменена между перезапусками, чтобы обеспечить восстановление состояния:
    • Агрегат потоковой передачи — например, sdf.groupBy("a").agg(...). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • Дедупликация потоковой передачи — например, sdf.dropDuplicates("a"). Любое изменение числа или типа ключей группирования или агрегатов не допускается.
    • Соединение потока с потоком — например, sdf1.join(sdf2, ...) (т. е. оба входных потока создаются с помощью sparkSession.readStream). Изменения в схеме или эквивалентное соединение столбцов не допускаются. Изменения в типе соединения (внешнее или внутреннее) не допускаются. Другие изменения в условии соединения определены неверно.
    • Произвольная операция с отслеживанием состояния — например, sdf.groupByKey(...).mapGroupsWithState(...) или sdf.groupByKey(...).flatMapGroupsWithState(...). Любое изменение схемы определяемого пользователем состояния и типа времени ожидания не разрешено. Любое изменение в определяемой пользователем функции сопоставления состояний разрешено, но семантический результат изменения зависит от пользовательской логики. Если вы действительно хотите поддерживать изменения схемы состояния, то можете явно закодировать или декодировать сложные структуры данных о состоянии в байты, используя схему кодирования и декодирования, поддерживающую миграцию схемы. Например, если сохранить состояние в виде байтов в кодировке Avro, вы можете изменить значение Avro-state-schema между перезапусками запросов, так как при этом будет восстановлено двоичное состояние.