Verwenden von Delta Lake mit Streamingdaten
Bei allen Daten, die wir bisher erkundet haben, handelt es sich um statische Daten in Dateien. Viele Szenarien für die Datenanalyse beziehen jedoch Streamingdaten ein, die nahezu in Echtzeit verarbeitet werden müssen. Sie müssen beispielsweise Messwerte erfassen, die von IoT-Geräten (Internet of Things, Internet der Dinge) ausgegeben werden, und diese in einer Tabelle speichern, sobald sie auftreten.
Spark Structured Streaming
Eine typische Lösung für die Verarbeitung von Datenströmen sieht vor, dass Sie ständig einen Datenstrom aus einer Quelle lesen, ihn optional verarbeiten, um bestimmte Felder auszuwählen, Werte zu aggregieren und zu gruppieren oder die Daten anderweitig zu bearbeiten, und die Ergebnisse in eine Senke schreiben.
Spark bietet native Unterstützung für Streamingdaten mittels Spark Structured Streaming, einer API, die auf einem unbegrenzten DataFrame basiert, in dem Streamingdaten zur Verarbeitung erfasst werden. Ein Spark Structured Streaming-DataFrame kann Daten aus vielen verschiedenen Arten von Streamingquellen lesen, z. B. Netzwerkports, in Echtzeit arbeitende Nachrichtenbrokerdienste wie Azure Event Hubs oder Kafka oder Dateisysteme.
Tipp
Weitere Informationen zu Spark Structured Streaming finden Sie in der Spark-Dokumentation im Structured Streaming Programming Guide.
Streaming mit Delta Lake-Tabellen
Sie können eine Delta Lake-Tabelle als Quelle oder Senke für Spark Structured Streaming nutzen. So können Sie beispielsweise einen Echtzeitdatenstrom eines IoT-Geräts erfassen und den Datenstrom direkt als Senke in eine Delta Lake-Tabelle schreiben. Dadurch können Sie die Tabelle abfragen, um die neuesten gestreamten Daten einzusehen. Oder Sie können eine Delta-Tabelle als Streamingquelle lesen, sodass Sie ständig neue Daten melden können, sobald sie der Tabelle hinzugefügt werden.
Verwenden einer Delta Lake-Tabelle als Streamingquelle
Im folgenden PySpark-Beispiel wird eine Delta-Tabelle verwendet, um Details zu Internetbestellungen zu speichern. Es wird ein Datenstrom erstellt, der Daten aus dem Delta Lake-Tabellenordner liest, sobald neue Daten angehängt werden.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Load a streaming dataframe from the Delta Table
stream_df = spark.readStream.format("delta") \
.option("ignoreChanges", "true") \
.load("/delta/internetorders")
# Now you can process the streaming data in the dataframe
# for example, show it:
stream_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
Hinweis
Wenn Sie eine Delta-Tabelle als Streamingquelle verwenden, können nur Anfügevorgänge in den Datenstrom einbezogen werden. Datenänderungen verursachen einen Fehler, es sei denn, Sie geben die Option ignoreChanges
oder ignoreDeletes
an.
Nachdem Sie die Daten aus der Delta-Tabelle in einen DataFrame eingelesen haben, können Sie sie mit der Spark Structured Streaming-API verarbeiten. Im obigen Beispiel wird der DataFrame einfach nur angezeigt. Sie könnten jedoch mit Spark Structured Streaming die Daten in Zeitfenstern aggregieren (z. B. um die Anzahl der Bestellungen pro Minute zu zählen) und die aggregierten Ergebnisse an einen nachgelagerten Prozess senden, um sie nahezu in Echtzeit zu visualisieren.
Verwenden einer Delta Lake-Tabelle als Streamingsenke
Im folgenden PySpark-Beispiel wird ein Datenstrom aus JSON-Dateien in einen Ordner eingelesen. Die JSON-Daten in jeder Datei enthalten den Status eines IoT-Geräts im Format {"device":"Dev1","status":"ok"}
. Neue Daten werden dem Datenstrom hinzugefügt, sobald eine Datei dem Ordner hinzugefügt wird. Der Eingabedatenstromstrom ist ein unbegrenzter DataFrame, der dann im Delta-Format an einen Ordnerspeicherort für eine Delta-Tabelle geschrieben wird.
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a stream that reads JSON data from a folder
inputPath = '/streamingdata/'
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
stream_df = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write the stream to a delta table
table_path = '/delta/devicetable'
checkpoint_path = '/delta/checkpoint'
delta_stream = stream_df.writeStream.format("delta").option("checkpointLocation", checkpoint_path).start(table_path)
Hinweis
Die Option checkpointLocation
dient zum Schreiben einer Prüfpunktdatei, die den Status der Datenstromverarbeitung verfolgt. Diese Datei ermöglicht Ihnen, nach einem Ausfall an der Stelle fortzufahren, an der die Datenstromverarbeitung unterbrochen wurde.
Nach dem Start des Streamingprozesses können Sie die Delta Lake-Tabelle, in die die Streamingausgabe geschrieben wird, abfragen, um die neuesten Daten einzusehen. Der folgende Code erstellt zum Beispiel eine Katalogtabelle im Delta Lake-Tabellenordner und fragt sie ab:
%%sql
CREATE TABLE DeviceTable
USING DELTA
LOCATION '/delta/devicetable';
SELECT device, status
FROM DeviceTable;
Um den Datenstrom anzuhalten, der in die Delta Lake-Tabelle geschrieben wird, können Sie die stop
-Methode der Streamingabfrage verwenden:
delta_stream.stop()
Tipp
Weitere Informationen zum Arbeiten mit Delta Lake-Tabellen für Streamingdaten finden Sie in der Delta Lake-Dokumentation unter Tabelle: Lese-und Schreibvorgänge als Batchvorgang.