Entwickeln von Pipelinecode mit SQL
Delta Live Tables führt mehrere neue SQL-Schlüsselwörter und -Funktionen zum Definieren materialisierter Ansichten und Streamingtabellen in Pipelines ein. DIE SQL-Unterstützung für die Entwicklung von Pipelines basiert auf den Grundlagen von Spark SQL und bietet Unterstützung für strukturiertes Streaming.
Benutzer, die mit PySpark DataFrames vertraut sind, bevorzugen möglicherweise die Entwicklung von Pipelinecode mit Python. Python unterstützt umfangreichere Tests und Vorgänge, die mit SQL implementiert werden müssen, z. B. Metaprogrammierungsvorgänge. Siehe Entwickeln von Pipelinecode mit Python.
Eine vollständige Referenz zur SQL-Syntax von Delta Live Tables finden Sie in der SQL-Sprachreferenz für Delta Live Tables.
Grundlagen von SQL für die Pipelineentwicklung
SQL-Code, der Delta Live Tables-Datasets erstellt, verwendet die CREATE OR REFRESH
Syntax zum Definieren materialisierter Ansichten und Streamingtabellen für Abfrageergebnisse.
Das STREAM
Schlüsselwort gibt an, ob die datenquelle, auf die in einer SELECT
Klausel verwiesen wird, mit Streamingsemantik gelesen werden soll.
Delta Live Tables Quellcode unterscheidet sich kritisch von SQL-Skripts: Delta Live Tables wertet alle Datasetdefinitionen für alle Quellcodedateien aus, die in einer Pipeline konfiguriert sind, und erstellt ein Dataflowdiagramm, bevor Abfragen ausgeführt werden. Die Reihenfolge der Abfragen, die in einem Notizbuch oder Skript angezeigt werden, definiert nicht die Reihenfolge der Ausführung.
Erstellen einer materialisierten Ansicht mit SQL
Im folgenden Codebeispiel wird die grundlegende Syntax zum Erstellen einer materialisierten Ansicht mit SQL veranschaulicht:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Erstellen einer Streamingtabelle mit SQL
Im folgenden Codebeispiel wird die grundlegende Syntax zum Erstellen einer Streamingtabelle mit SQL veranschaulicht:
Hinweis
Nicht alle Datenquellen unterstützen Streaminglesevorgänge, und einige Datenquellen sollten immer mit Streamingsemantik verarbeitet werden.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Laden von Daten aus objektspeicher
Delta Live Tables unterstützt das Laden von Daten aus allen von Azure Databricks unterstützten Formaten. Siehe Datenformatoptionen.
Hinweis
In diesen Beispielen werden Daten verwendet, die unter der /databricks-datasets
automatischen Bereitstellung in Ihrem Arbeitsbereich verfügbar sind. Databricks empfiehlt die Verwendung von Volumepfaden oder Cloud-URIs, um auf daten zu verweisen, die im Cloudobjektspeicher gespeichert sind. Weitere Informationen finden Sie unter Was sind Unity Catalog-Volumes?.
Databricks empfiehlt die Verwendung von AutoLade- und Streamingtabellen beim Konfigurieren von inkrementellen Erfassungsworkloads für Daten, die im Cloudobjektspeicher gespeichert sind. Weitere Informationen finden Sie unter Automatisches Laden.
SQL verwendet die Funktion, um die Funktion zum Aufrufen der read_files
Funktion zum Automatischen Laden aufzurufen. Sie müssen auch das STREAM
Schlüsselwort zum Konfigurieren eines Streaming-Lesevorgangs mit read_files
.
Im folgenden Beispiel wird eine Streamingtabelle aus JSON-Dateien mit dem automatischen Ladeprogramm erstellt:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Die read_files
Funktion unterstützt auch Batchsemantik zum Erstellen materialisierter Ansichten. Im folgenden Beispiel werden Batchsemantik verwendet, um ein JSON-Verzeichnis zu lesen und eine materialisierte Ansicht zu erstellen:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Überprüfen von Daten mit Erwartungen
Sie können die Erwartungen verwenden, um Einschränkungen für die Datenqualität festzulegen und zu erzwingen. Siehe Verwalten der Datenqualität mit Delta Live Tables.
Mit dem folgenden Code wird eine Erwartung definiert valid_data
, die Datensätze abbricht, die während der Datenaufnahme null sind:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Abfragen materialisierter Ansichten und Streamingtabellen, die in Ihrer Pipeline definiert sind
Verwenden Sie das LIVE
Schema, um andere materialisierte Ansichten und Streamingtabellen abzufragen, die in Ihrer Pipeline definiert sind.
Im folgenden Beispiel werden vier Datasets definiert:
- Eine Streamingtabelle,
orders
die JSON-Daten lädt. - Eine materialisierte Ansicht, die CSV-Daten
customers
lädt. - Eine materialisierte Ansicht mit dem Namen
customer_orders
, die Datensätze aus denorders
Datensätzen undcustomers
Datasets verknüpft, den Zeitstempel der Reihenfolge in ein Datum wandelt und diecustomer_id
Felder ,order_number
, undstate
order_date
die Felder auswählt. - Eine materialisierte Ansicht mit dem Namen
daily_orders_by_state
, die die tägliche Anzahl der Bestellungen für jeden Zustand aggregiert.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;