Importieren von Python-Modulen aus Git-Ordnern oder Arbeitsbereichsdateien
Sie können Python-Code in einem Databricks-Git-Ordner oder in Arbeitsbereichsdateien speichern und diesen Python-Code dann in Ihre Delta Live Tables-Pipelines importieren. Weitere Informationen zum Verwenden von Modulen in Git-Ordnern oder Arbeitsbereichsdateien finden Sie unter Arbeiten mit Python- und R-Modulen.
Hinweis
Sie können keinen Quellcode aus einem Notebook importieren, das in einem Databricks-Git-Ordner oder einer Arbeitsbereichsdatei gespeichert ist. Fügen Sie das Notebook stattdessen direkt hinzu, wenn Sie eine Pipeline erstellen oder bearbeiten. Siehe Konfigurieren einer Delta Live Tables-Pipeline.
Importieren eines Python-Moduls in eine Delta Live Tables-Pipeline
Im folgenden Beispiel wird das Importieren von Datasetabfragen als Python-Module aus Arbeitsbereichsdateien veranschaulicht. Obwohl in diesem Beispiel die Verwendung von Arbeitsbereichsdateien zum Speichern des Pipelinequellcodes beschrieben wird, können Sie es auch mit Quellcode verwenden, der in einem Git-Ordner gespeichert ist.
Gehen Sie folgendermaßen vor, um dieses Beispiel auszuführen:
Klicken Sie auf der Seitenleiste Ihres Azure Databricks-Arbeitsbereichs auf Arbeitsbereich, um den Arbeitsbereichsbrowser zu öffnen.
Verwenden Sie den Arbeitsbereichsbrowser, um ein Verzeichnis für die Python-Module auszuwählen.
Klicken Sie im ausgewählten Verzeichnis in der Spalte ganz rechts auf und dann auf Erstellen Datei>.
Geben Sie einen Namen für die Datei ein, z. B.
clickstream_raw_module.py
. Der Datei-Editor wird geöffnet. Um ein Modul zum Lesen von Quelldaten in einer Tabelle zu erstellen, geben Sie Folgendes im Editorfenster ein:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Um ein Modul zu erstellen, das eine neue Tabelle mit vorbereiteten Daten erstellt, erstellen Sie eine neue Datei im selben Verzeichnis, geben Sie einen Namen für die Datei ein (z. B.
clickstream_prepared_module.py
), und geben Sie Folgendes in das neue Editorfenster ein:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Erstellen Sie als Nächstes ein Pipeline-Notebook. Wechseln Sie zur Landing Page von Azure Databricks. Wählen Sie Notebook erstellen aus, oder klicken Sie auf der Seitenleiste auf Neu, und wählen Sie Notebook aus. Sie können das Notebook auch im Arbeitsbereichsbrowser erstellen, indem Sie auf und dann auf Erstellen > Notebook klicken.
Geben Sie einen Namen für das Notebook ein, bestätigen Sie, dass Python die Standardsprache ist.
Klicken Sie auf Erstellen.
Geben Sie den Beispielcode in das Notebook ein.
Hinweis
Wenn Ihr Notebook Module oder Pakete aus einem Arbeitsbereichsdateipfad oder einem Git-Ordnerpfad importiert, der sich vom Notebook-Verzeichnis unterscheidet, müssen Sie den Pfad manuell mithilfe von
sys.path.append()
an die Dateien anfügen.Wenn Sie eine Datei aus einem Git-Ordner importieren, muss dem Pfad vorangestellt
/Workspace/
sein. Beispiel:sys.path.append('/Workspace/...')
. Wenn/Workspace/
aus dem Pfad weggelassen wird, tritt ein Fehler auf.Wenn die Module oder Pakete im selben Verzeichnis wie das Notebook gespeichert sind, müssen Sie den Pfad nicht manuell anfügen. Außerdem müssen Sie den Pfad beim Importieren aus dem Stammverzeichnis eines Git-Ordners nicht manuell anfügen, da das Stammverzeichnis automatisch an den Pfad angefügt wird.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("LIVE.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Ersetzen Sie
<module-path>
durch den Pfad zum Verzeichnis, das die zu importierenden Python-Module enthält.Erstellen Sie eine Pipeline mithilfe des neuen Notebooks.
Klicken Sie zum Ausführen der Pipeline auf der Seite Pipelinedetails auf Start.
Sie können Python-Code auch als Paket importieren. Der folgende Codeschnipsel aus einem Delta Live Tables-Notebook importiert das test_utils
-Paket aus dem dlt_packages
-Verzeichnis im selben Verzeichnis wie das Notebook. Das dlt_packages
-Verzeichnis enthält die Dateien test_utils.py
und __init__.py
, und test_utils.py
definiert die Funktion create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)