Freigeben über


Tutorial: Ausführen Ihrer ersten Delta Live Tables-Pipeline

Dieses Lernprogramm führt Sie durch die Schritte zum Konfigurieren Ihrer ersten Delta Live Tables-Pipeline, schreiben Sie einfachen ETL-Code, und führen Sie ein Pipelineupdate aus.

Alle Schritte in diesem Lernprogramm sind für Arbeitsbereiche mit aktivierter Unity-Katalog ausgelegt. Sie können auch Delta Live Tables-Pipelines für die Arbeit mit dem älteren Hive-Metaspeicher konfigurieren. Siehe Verwenden von Delta Live Tables-Pipelines mit legacy-Hive-Metaspeicher.

Hinweis

Dieses Lernprogramm enthält Anweisungen zum Entwickeln und Validieren neuer Pipelinecode mithilfe von Databricks-Notizbüchern. Sie können Pipelines auch mithilfe von Quellcode in Python- oder SQL-Dateien konfigurieren.

Sie können eine Pipeline so konfigurieren, dass Der Code ausgeführt wird, wenn Sie bereits Quellcode mit der Syntax delta Live Tables geschrieben haben. Siehe Konfigurieren einer Delta Live Tables-Pipeline.

Sie können die vollständig deklarative SQL-Syntax in Databricks SQL verwenden, um Aktualisierungszeitpläne für materialisierte Ansichten und Streamingtabellen als vom Unity-Katalog verwaltete Objekte zu registrieren und festzulegen. Siehe Verwenden materialisierter Ansichten in Databricks SQL und Laden von Daten mithilfe von Streamingtabellen in Databricks SQL.

Beispiel: Erfassen und Verarbeiten von Babynamen in New York

Im Beispiel in diesem Artikel wird ein öffentlich verfügbares Dataset verwendet, das Datensätze mit New York State Baby Names enthält. In diesem Beispiel wird die Verwendung einer Delta Live Tables-Pipeline veranschaulicht, um:

  • Lesen sie unformatierte CSV-Daten aus einem Volume in eine Tabelle.
  • Lesen Sie die Datensätze aus der Aufnahmetabelle, und verwenden Sie die Erwartungen von Delta Live Tables, um eine neue Tabelle zu erstellen, die bereinigte Daten enthält.
  • Verwenden der bereinigten Datensätze als Eingabe für Delta Live Tables-Abfragen, die abgeleitete Datasets erstellen.

Dieser Code veranschaulicht ein vereinfachtes Beispiel für die Medallion-Architektur. Siehe Worum handelt es sich bei der Medallion- Lakehouse-Architektur?.

Implementierungen dieses Beispiels werden für Python und SQL bereitgestellt. Führen Sie die Schritte zum Erstellen einer neuen Pipeline und eines neuen Notizbuchs aus, und kopieren Sie dann den bereitgestellten Code.

Beispielnotizbücher mit vollständigem Code werden ebenfalls bereitgestellt.

Anforderungen

  • Zum Starten einer Pipeline benötigen Sie die Berechtigung zur Clustererstellung oder Zugriff auf eine Clusterrichtlinie, die einen Delta Live Tables-Cluster definiert. Die Delta Live Tables-Runtime erstellt einen Cluster, bevor die Pipeline ausgeführt wird und schlägt fehl, wenn Sie nicht über die richtige Berechtigung verfügen.

  • Alle Benutzer können Updates standardmäßig mit serverlosen Pipelines auslösen. Serverless muss auf Kontoebene aktiviert sein und ist möglicherweise nicht in Ihrer Arbeitsbereichsregion verfügbar. Weitere Informationen finden Sie unter Aktivieren des serverlosen Computings.

  • In den Beispielen in diesem Lernprogramm wird Unity-Katalog verwendet. Databricks empfiehlt das Erstellen eines neuen Schemas zum Ausführen dieses Lernprogramms, da mehrere Datenbankobjekte im Zielschema erstellt werden.

    • Um ein neues Schema in einem Katalog zu erstellen, müssen ALL PRIVILEGES Sie über berechtigungen oder verfügen USE CATALOG CREATE SCHEMA .
    • Wenn Sie kein neues Schema erstellen können, führen Sie dieses Lernprogramm für ein vorhandenes Schema aus. Sie müssen über die folgenden Berechtigungen verfügen:
      • USE CATALOG für den übergeordneten Katalog.
      • ALL PRIVILEGES oder USE SCHEMA, CREATE MATERIALIZED VIEWund CREATE TABLE Berechtigungen für das Zielschema.
    • In diesem Lernprogramm wird ein Volume zum Speichern von Beispieldaten verwendet. Databricks empfiehlt das Erstellen eines neuen Volumes für dieses Lernprogramm. Wenn Sie ein neues Schema für dieses Lernprogramm erstellen, können Sie ein neues Volume in diesem Schema erstellen.
      • Um ein neues Volume in einem vorhandenen Schema zu erstellen, müssen Sie über die folgenden Berechtigungen verfügen:
        • USE CATALOG für den übergeordneten Katalog.
        • ALL PRIVILEGES oder USE SCHEMA Berechtigungen CREATE VOLUME für das Zielschema.
      • Optional können Sie ein vorhandenes Volume verwenden. Sie müssen über die folgenden Berechtigungen verfügen:
        • USE CATALOG für den übergeordneten Katalog.
        • USE SCHEMA für das übergeordnete Schema.
        • ALL PRIVILEGES oder READ VOLUME auf WRITE VOLUME dem Zielvolume.

    Wenden Sie sich an Ihren Databricks-Administrator, um diese Berechtigungen festzulegen. Weitere Informationen zu Unity-Katalogberechtigungen finden Sie unter Unity Catalog-Berechtigungen und sicherungsfähige Objekte.

Schritt 0: Herunterladen von Daten

In diesem Beispiel werden Daten aus einem Unity-Katalogvolume geladen. Der folgende Code lädt eine CSV-Datei herunter und speichert sie im angegebenen Volume. Öffnen Sie ein neues Notizbuch, und führen Sie den folgenden Code aus, um diese Daten auf das angegebene Volume herunterzuladen:

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

Ersetzen Sie <catalog-name>, <schema-name> und <volume-name> durch die Katalog-, Schema- und Volumenamen für ein Unity Catalog-Volume. Der bereitgestellte Code versucht, das angegebene Schema und das angegebene Volume zu erstellen, wenn diese Objekte nicht vorhanden sind. Sie müssen über die entsprechenden Berechtigungen verfügen, um Objekte im Unity-Katalog zu erstellen und zu schreiben. Siehe Anforderungen.

Hinweis

Stellen Sie sicher, dass dieses Notizbuch erfolgreich ausgeführt wurde, bevor Sie mit dem Lernprogramm fortfahren. Konfigurieren Sie dieses Notizbuch nicht als Teil Der Pipeline.

Schritt 1: Erstellen einer Pipeline

Delta Live Tables erstellt Pipelines, indem Abhängigkeiten aufgelöst werden, die in Notizbüchern oder Dateien (als Quellcode bezeichnet) mithilfe der Syntax von Delta Live Tables definiert sind. Jede Quellcodedatei kann nur eine Sprache enthalten, Sie können jedoch mehrere sprachspezifische Notizbücher oder Dateien in der Pipeline hinzufügen.

Wichtig

Konfigurieren Sie keine Ressourcen im Feld "Quellcode ". Wenn Sie dieses Feld schwarz lassen, wird ein Notizbuch für die Quellcodeerstellung erstellt und konfiguriert.

Die Anweisungen in diesem Lernprogramm verwenden serverlose Compute- und Unity-Katalog. Verwenden Sie die Standardeinstellungen für alle Konfigurationsoptionen, die in diesen Anweisungen nicht erwähnt werden.

Hinweis

Wenn Serverless in Ihrem Arbeitsbereich nicht aktiviert oder unterstützt wird, können Sie das Lernprogramm mit den Standardberechnungseinstellungen abschließen. Sie müssen "Unity-Katalog" unter "Speicheroptionen" im Abschnitt "Ziel" der Benutzeroberfläche "Pipeline erstellen" manuell auswählen.

Gehen Sie wie folgt vor, um eine neue Pipeline zu konfigurieren:

  1. Klicken Sie in der Randleiste auf Delta Live Tables .
  2. Klicken Sie auf "Pipeline erstellen".
  3. Geben Sie einen eindeutigen Pipelinenamen an.
  4. Aktivieren Sie das Kontrollkästchen neben Serverless.
  5. Wählen Sie einen Katalog aus, um Daten zu veröffentlichen.
  6. Wählen Sie ein Schema im Katalog aus.
    • Geben Sie einen neuen Schemanamen an, um ein Schema zu erstellen.
  7. Definieren Sie drei Pipelineparameter mithilfe der Schaltfläche "Konfiguration hinzufügen" unter "Erweitert ", um drei Konfigurationen hinzuzufügen. Geben Sie den Katalog, das Schema und das Volume an, in den Sie Daten mit den folgenden Parameternamen heruntergeladen haben:
    • my_catalog
    • my_schema
    • my_volume
  8. Klicken Sie auf Erstellen.

Die Pipeline-Benutzeroberfläche wird für die neu erstellte Pipeline angezeigt. Ein Quellcode-Notizbuch wird automatisch erstellt und für die Pipeline konfiguriert.

Das Notizbuch wird in einem neuen Verzeichnis in Ihrem Benutzerverzeichnis erstellt. Der Name des neuen Verzeichnisses und der Datei entspricht dem Namen Der Pipeline. Beispiel: /Users/your.username@databricks.com/my_pipeline/my_pipeline.

Ein Link für den Zugriff auf dieses Notizbuch befindet sich im Feld "Quellcode " im Bereich "Pipelinedetails ". Klicken Sie auf den Link, um das Notizbuch zu öffnen, bevor Sie mit dem nächsten Schritt fortfahren.

Schritt 2: Deklarieren materialisierter Ansichten und Streamingtabellen in einem Notizbuch mit Python oder SQL

Sie können Datbricks-Notizbücher verwenden, um Quellcode für Delta Live Tables-Pipelines interaktiv zu entwickeln und zu validieren. Sie müssen Ihr Notizbuch an die Pipeline anfügen, um diese Funktionalität zu verwenden. So fügen Sie Ihr neu erstelltes Notizbuch an die soeben erstellte Pipeline an:

  1. Klicken Sie oben rechts auf "Verbinden" , um das Berechnungskonfigurationsmenü zu öffnen.
  2. Zeigen Sie mit der Maus auf den Namen der Pipeline, die Sie in Schritt 1 erstellt haben.
  3. Klicken Sie auf Verbinden.

Die UI ändert sich, um die Schaltflächen "Überprüfen" und "Start" oben rechts einzuschließen. Weitere Informationen zur Unterstützung von Notizbüchern für die Entwicklung von Pipelinecode finden Sie unter Entwickeln und Debuggen von Delta Live Tables-Pipelines in Notizbüchern.

Wichtig

  • Delta Live Tables Pipelines evaluieren alle Zellen in einem Notizbuch während der Planung. Im Gegensatz zu Notizbüchern, die als Aufträge für alle Zwecke ausgeführt oder geplant werden, garantieren Pipelines nicht, dass Zellen in der angegebenen Reihenfolge ausgeführt werden.
  • Notizbücher können nur eine programmiersprache enthalten. Mischen Sie Python- und SQL-Code nicht in Pipelinequellcode-Notizbüchern.

Ausführliche Informationen zum Entwickeln von Code mit Python oder SQL finden Sie unter Entwickeln von Pipelinecode mit Python oder Entwickeln von Pipelinecode mit SQL.

Beispielpipelinecode

Um das Beispiel in diesem Lernprogramm zu implementieren, kopieren Sie den folgenden Code, und fügen Sie ihn in eine Zelle im Notizbuch ein, die als Quellcode für Ihre Pipeline konfiguriert ist.

Der bereitgestellte Code führt folgende Aktionen aus:

  • Importiert erforderliche Module (nur Python).
  • Referenzparameter, die während der Pipelinekonfiguration definiert sind.
  • Definiert eine Streamingtabelle mit dem Namen baby_names_raw , die von einem Volume aufgenommen wird.
  • Definiert eine materialisierte Ansicht mit dem Namen baby_names_prepared , die erfasste Daten überprüft.
  • Definiert eine materialisierte Ansicht mit dem Namen top_baby_names_2021 , die eine stark verfeinerte Ansicht der Daten enthält.

Python

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

SQL

-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

Schritt 3: Starten eines Pipelineupdates

Um eine Pipelineaktualisierung zu starten, klicken Sie oben rechts auf der Notizbuch-UI auf die Schaltfläche "Start ".

Beispielnotebooks

Die folgenden Notizbücher enthalten die gleichen Codebeispiele in diesem Artikel. Diese Notizbücher haben dieselben Anforderungen wie die Schritte in diesem Artikel. Siehe Anforderungen.

Führen Sie zum Importieren eines Notizbuchs die folgenden Schritte aus:

  1. Öffnen Sie die Benutzeroberfläche des Notizbuchs.
    • Klicken Sie auf +Neues>Notizbuch.
    • Ein leeres Notizbuch wird geöffnet.
  2. Klicken Sie auf Datei>Importieren. Das Dialogfeld Import wird angezeigt.
  3. Wählen Sie die URL-Option für den Import aus.
  4. Fügen Sie die URL des Notizbuchs ein.
  5. Klicken Sie auf Importieren.

Dieses Lernprogramm erfordert, dass Sie ein Dateneinrichtungsnotizbuch ausführen, bevor Sie Ihre Delta Live Tables-Pipeline konfigurieren und ausführen. Importieren Sie das folgende Notizbuch, fügen Sie das Notizbuch an eine Computeressource an, füllen Sie die erforderliche Variable für my_catalog, my_schemaund my_volumeklicken Sie auf "Alle ausführen".

Lernprogramm zum Herunterladen von Daten für Pipelines

Notebook abrufen

Die folgenden Notizbücher enthalten Beispiele in Python oder SQL. Wenn Sie ein Notizbuch importieren, wird es in Ihrem Heimverzeichnis des Benutzers gespeichert.

Führen Sie nach dem Importieren eines der folgenden Notizbücher die Schritte zum Erstellen einer Pipeline aus, verwenden Sie jedoch die Quellcodedateiauswahl , um das heruntergeladene Notizbuch auszuwählen. Nachdem Sie die Pipeline mit einem Notizbuch erstellt haben, das als Quellcode konfiguriert ist, klicken Sie in der Pipeline-UI auf "Start ", um ein Update auszulösen.

Erste Schritte mit dem Delta Live Tables-Python-Notebook

Notebook abrufen

Erste Schritte mit dem Delta Live Tables-SQL-Notebook

Notebook abrufen