Streaming und inkrementelle Aufnahme
Azure Databricks verwendet Apache Spark Structured Streaming zur Unterstützung zahlreicher Produkte, die mit Ingestion-Workloads verbunden sind, einschließlich:
- Autoloader
COPY INTO
- Delta Live Tables-Pipelines
- Materialisierte Ansichten und Streaming-Tabellen in Databricks SQL
In diesem Artikel werden einige der Unterschiede zwischen der Streaming- und der inkrementellen Stapelverarbeitungssemantik erörtert und ein Überblick über die Konfiguration von Ingestion-Workloads für die gewünschte Semantik in Databricks gegeben.
Was ist der Unterschied zwischen Streaming und inkrementeller Batchaufnahme?
Die möglichen Konfigurationen des Ingestion-Workflows reichen von einer nahezu in Echtzeit erfolgenden Verarbeitung bis hin zu einer seltenen inkrementellen Stapelverarbeitung. Beide Muster verwenden Apache Spark Structured Streaming, um die inkrementelle Verarbeitung zu aktivieren, weisen jedoch unterschiedliche Semantik auf. Aus Gründen der Einfachheit bezieht sich dieser Artikel auf nahezu Echtzeitaufnahme als Streaming-Ingestion und seltener inkrementeller inkrementeller Verarbeitung als inkrementelle Batchaufnahme.
Streamingerfassung
Streaming bezieht sich im Zusammenhang mit der Datenaufnahme und Tabellenaktualisierung auf eine Datenverarbeitung nahezu in Echtzeit, bei der Azure Databricks Datensätze von der Quelle bis zur Senke in Mikrobatches unter Verwendung einer ständig aktiven Infrastruktur aufnimmt. Eine Streamingworkload erfasst kontinuierlich Updates aus konfigurierten Datenquellen, es sei denn, ein Fehler tritt auf, der die Aufnahme beendet.
Inkrementelle Batchaufnahme
Inkrementelle Batch-Ingestion bezieht sich auf ein Muster, bei dem alle neuen Datensätze aus einer Datenquelle in einem kurzlebigen Job verarbeitet werden. Die inkrementelle Batch-Ingestion erfolgt häufig nach einem Zeitplan, kann aber auch manuell oder auf der Grundlage des Eingangs von Dateien ausgelöst werden.
Die inkrementelle Batch-Ingestion unterscheidet sich von der Batch-Ingestion darin, dass neue Datensätze in der Datenquelle automatisch erkannt und Datensätze ignoriert werden, die bereits aufgenommen wurden.
Ingestion mit Jobs
Mit Databricks Jobs können Sie Workflows orchestrieren und Aufgaben planen, die Notebooks, Bibliotheken, Delta Live Tables-Pipelines und Databricks-SQL-Abfragen umfassen.
Hinweis
Sie können alle Computetypen und Aufgabentypen von Azure Databricks verwenden, um die inkrementelle Batch-Ingestion zu konfigurieren. Streaming-Ingestion wird nur in der Produktion für klassische Jobs Compute und Delta Live-Tabellen unterstützt.
Jobs weisen zwei primäre Betriebsmodi auf:
- Fortlaufende Jobs werden automatisch wiederholt, wenn ein Fehler auftritt. Dieser Modus ist für das Ingestion des Streamings vorgesehen.
- Ausgelöste Jobs führen Aufgaben aus, wenn sie ausgelöst werden. Zu den Triggern gehören:
- Zeitbasierte Trigger, die Jobs für einen bestimmten Zeitplan ausführen.
- Dateibasierte Trigger, die Jobs ausführen, wenn Dateien an einem angegebenen Speicherort landen.
- Andere Trigger wie REST-API-Aufrufe, Ausführung von Azure Databricks CLI-Befehlen oder Klicken auf die Schaltfläche Jetzt ausführen in der Arbeitsbereich-Benutzeroberfläche.
Konfigurieren Sie Ihre Aufträge für inkrementelle Batch-Workloads mithilfe des AvailableNow
-Triggermodus wie folgt:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
Für Streaming-Workloads ist das standardmäßige Triggerintervall processingTime ="500ms"
. Das folgende Beispiel zeigt, wie Sie einen Mikrobatch alle fünf Sekunden verarbeiten:
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Wichtig
Serverlose Jobs unterstützen keine Scala-, fortlaufenden oder zeitbasierten Triggerintervalle für strukturiertes Streaming. Verwenden Sie klassische Jobs, wenn Sie nahezu echtzeitbasierte Aufnahmesemantik benötigen.
Ingestion mit Delta Live Tables
Ähnlich wie bei Jobs können Delta Live Tables-Pipelines entweder im ausgelösten oder im kontinuierlichen Modus laufen. Für echtzeitnahe Streaming-Semantik mit Streaming-Tabellen sollten Sie den kontinuierlichen Modus verwenden.
Verwenden Sie Streaming-Tabellen, um Streaming oder die inkrementelle Batch-Ingestion von Cloud Object Storage, Apache Kafka, Amazon Kinesis, Google Pub/Sub oder Apache Pulsar zu konfigurieren.
LakeFlow Connect verwendet Delta Live Tables zum Konfigurieren von Ingestion-Pipelines von verbundenen Systemen. Weitere Informationen finden Sie unter LakeFlow Connect.
Materialisierte Ansichten garantieren eine Operationssemantik, die der von Batch-Workloads entspricht, können aber viele Operationen optimieren, um Ergebnisse inkrementell zu berechnen. Siehe Aktualisierungsvorgänge für materialisierte Sichten.
Ingestion mit Databricks SQL
Sie können Streaming-Tabellen verwenden, um die inkrementelle Batch-Ingestion von Cloud Object Storage, Apache Kafka, Amazon Kinesis, Google Pub/Sub oder Apache Pulsar zu konfigurieren.
Sie können materialisierte Ansichten verwenden, um die inkrementelle Batchverarbeitung aus Quellen zu konfigurieren, die für einen bestimmten Satz von Vorgängen vollständig wiedergegeben werden können. Siehe Aktualisierungsvorgänge für materialisierte Sichten.
COPY INTO
stellt die vertraute SQL-Syntax für die inkrementelle Batchverarbeitung für Datendateien im Cloudobjektspeicher bereit. Das Verhalten von COPY INTO
ähnelt den Mustern, die von Streaming-Tabellen für Cloud-Objektspeicher unterstützt werden, aber nicht alle Standardeinstellungen sind für alle unterstützten Dateiformate gleichwertig.