使用 Delta Live Tables 載入資料
您可以使用 Delta Live Tables,從 Azure Databricks 上 Apache Spark 所支援的任何資料來源來載入資料。 您可以在 Delta Live Tables 中針對傳回 Spark DataFrame 的任何查詢定義資料集 (資料表和檢視),包括 Spark DataFrame 的串流 DataFrames 和 Pandas。 關於資料擷取工作,Databricks 建議針對大部分的使用案例使用串流資料表。 串流資料表適用於使用自動載入器或 Kafka 等訊息匯流排以從雲端物件儲存體擷取資料。 下列範例示範一些常見的模式。
重要
並非所有數據源都有 SQL 支援。 您可以在 Delta Live Tables 管線中混合 SQL 和 Python 筆記本,以針對擷取以外的所有作業使用 SQL。
如需使用預設未封裝於 Delta Live Tables 中的連結庫的詳細資訊,請參閱 管理 Delta Live Tables 管線的 Python 相依性。
從雲端物件儲存體載入檔案
Databricks 建議針對雲端物件記憶體的大部分數據擷取工作,搭配差異實時數據表使用自動載入器。 自動載入器和 Delta 即時數據表的設計目的是在數據抵達雲端記憶體時,以累加方式和等冪方式載入不斷成長的數據。 下列範例使用自動載入器從 CSV 和 JSON 檔案建立資料集:
注意
若要在啟用 Unity 目錄的管線中,使用自動載入器載入檔案,您必須使用外部位置。 若要深入了解搭配 Delta Live Tables 使用 Unity 目錄,請參閱搭配您的 Delta Live Tables 管線使用 Unity 目錄 (英文)。
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
請參閱 什麼是自動載入器? 和 自動載入器 SQL 語法。
警告
如果您使用自動載入器搭配檔案通知,並針對管線或串流數據表執行完整重新整理,則必須手動清除您的資源。 您可以在筆記本中使用 CloudFilesResourceManager 來執行清除。
從訊息匯流排載入資料
您可以將 Delta Live Tables 管線設定為使用串流數據表從訊息總線內嵌數據。 Databricks 建議結合串流數據表與連續執行和增強的自動調整,以提供最有效率的擷取,以便從訊息總線載入低延遲。 請參閱 使用增強型自動調整來優化差異實時數據表管線的叢集使用率。
例如,下列程式代碼會將串流數據表設定為從 Kafka 擷取資料:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
您可以在純 SQL 撰寫下游作業,以針對此資料執行串流轉換,如下列範例所示:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
如需使用事件中樞的範例,請參閱使用 Azure 事件中樞 做為差異實時數據表數據源。
請參閱 <設定串流資料來源>。
從外部系統載入資料
Delta Live Tables 支援從 Azure Databricks 所支援的任何數據源載入數據。 請參閱 連線至數據源。 您也可以針對支持的數據源使用 Lakehouse 同盟載入外部數據。 因為 Lakehouse 同盟需要 Databricks Runtime 13.3 LTS 或更新版本,若要使用 Lakehouse 同盟,您必須將管線設定為使用 預覽通道。
某些數據源在 SQL 中沒有對等的支援。 如果您無法搭配其中一個數據源使用 Lakehouse 同盟,您可以使用 Python 筆記本從來源內嵌數據。 您可以將 Python 和 SQL 原始程式碼新增至相同的 Delta Live Tables 管線。 下列範例會宣告具體化檢視,以存取遠端 PostgreSQL 數據表中的數據目前狀態:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
從雲端物件儲存體載入小型或靜態資料集
您可以使用 Apache Spark 載入語法來載入小型或靜態數據集。 Delta Live Tables 支援 Azure Databricks 上 Apache Spark 支援的所有文件格式。 如需完整清單,請參閱 數據格式選項。
下列範例示範載入 JSON 以建立 Delta Live Tables 數據表:
Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
SQL
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
注意
SELECT * FROM format.`path`;
SQL 建構適用於所有 Azure Databricks 上的 SQL 環境。 建議使用 SQL 搭配 Delta Live Tables 進行直接檔案存取的模式。
使用管線中的秘密安全地存取記憶體認證
您可以使用 Azure Databricks 秘密 來儲存認證,例如存取密鑰或密碼。 若要在管線中設定秘密,請在管線設定叢集組態中使用Spark屬性。 請參閱 設定 Delta Live Tables 管線的計算。
下列範例會使用秘密來儲存使用自動載入器從 Azure Data Lake Storage Gen2 (ADLS Gen2) 記憶體帳戶讀取輸入數據所需的存取密鑰。 您可以使用這個相同的方法來設定管線所需的任何秘密,例如 AWS 密鑰來存取 S3 或 Apache Hive 中繼存放區的密碼。
若要深入瞭解如何使用 Azure Data Lake Storage Gen2,請參閱 聯機到 Azure Data Lake Storage Gen2 和 Blob 記憶體。
注意
您必須將 spark.hadoop.
前置詞新增至 spark_conf
設定秘密值的組態金鑰。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
<storage-account-name>
具有ADLS Gen2記憶體帳戶名稱。- 具有 Azure Databricks 祕密範圍名稱的
<scope-name>
。 - 具有包含 Azure 儲存體帳戶存取金鑰之金鑰名稱的
<secret-name>
。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
<container-name>
使用儲存輸入數據的 Azure 記憶體帳戶容器名稱。<storage-account-name>
具有ADLS Gen2記憶體帳戶名稱。<path-to-input-dataset>
具有輸入數據集的路徑。
從Azure 事件中樞載入數據
Azure 事件中樞 是提供 Apache Kafka 相容介面的數據串流服務。 您可以使用 Delta Live Tables 執行時間中包含的結構化串流 Kafka 連接器,從 Azure 事件中樞 載入訊息。 若要深入瞭解如何從 Azure 事件中樞 載入和處理訊息,請參閱使用 Azure 事件中樞 作為差異實時數據表數據源。