在 Databricks SQL 中使用串流 tables 載入數據
Databricks 建議使用串流 tables 並使用 Databricks SQL 匯入數據。 串流 table 是註冊至 Unity Catalog 的 table,可額外支援串流或增量數據處理。 Delta Live Tables 管線會自動建立,以處理每個串流 table。 您可以從 Kafka 和雲端物件儲存使用串流 tables 載入增量資料。
本文示範如何使用串流 tables,從設定為 Unity Catalog 磁碟區或外部位置的雲端物件記憶體載入數據。
注意
若要瞭解如何使用 Delta Lake tables 作為串流來源和接收,請參閱 Delta table 串流讀取和寫入。
重要
Databricks SQL 中建立的串流 tables 是由無伺服器化的 Delta Live Tables 管線所提供支持。 您的工作區必須支持無伺服器管線才能使用這項功能。
開始之前
在開始之前,您必須符合下列需求。
工作區需求:
- 已啟用無伺服器的 Azure Databricks 帳戶。 如需更多資訊,請參閱啟用無伺服器 SQL 倉儲。
- 已啟用 Unity Catalog 的工作區。 如需詳細資訊,請參閱 Set 設置與管理 Unity Catalog。
計算需求:
您可以使用下列其中一項:
使用
Current
通道的 SQL 倉儲。在 Databricks Runtime 13.3 LTS 或以上版本上以共用存取模式計算。
在 Databricks Runtime 15.4 LTS 或更新版本上使用單一使用者存取模式進行計算。
Databricks Runtime 15.3 和以下版本上,您無法使用單一使用者計算來查詢 其他使用者所擁有的串流 tables。 只有在您擁有串流 table的情況下,您才能在 Databricks Runtime 15.3 及更早版本中使用單一用戶計算。 table 的創作者是持有人。
無論 table 擁有權如何,Databricks Runtime 15.4 LTS 和更高版本均支援在單一使用者計算上查詢由 Delta Live Tables產生的 tables。 若要利用 Databricks Runtime 15.4 LTS 和更新版本所提供的數據篩選,您必須確認 您的工作區已啟用無伺服器計算,因為支援 Delta Live Tablestables 產生的數據篩選功能會在無伺服器計算上執行。 當您使用單一使用者計算來執行數據篩選作業時,可能會向無伺服器計算資源收費。 請參閱 單一用戶計算的細微訪問控制。
權限需求
- Unity Catalog 外部位置上的
READ FILES
許可權。 如需相關資訊,請參閱建立外部位置以將雲端儲存連線到 Azure Databricks。 -
catalog 上的
USE CATALOG
許可權,您在其中建立串流 table。 - 您在 schema 上擁有的
USE SCHEMA
許可權,建立串流 table。 - 您在建立串流 table的 schema 上擁有之
CREATE TABLE
許可權。
其他需求:
來源資料的路徑。
磁碟區路徑範例:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
外部位置路徑範例:
abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis
注意
本文假設您想要載入的數據位於雲端儲存位置,其對應至您有權存取的 Unity Catalog 磁碟區或外部位置。
探索和預覽來源資料
在工作區的側邊欄中,按一下查詢,然後按一下建立查詢。
在查詢編輯器中,select 使用下拉式 list
Current
通道的 SQL 倉儲。將下列內容貼到編輯器中,以角括弧(
<>
)取代 values,以識別源數據的資訊,然後按兩下 [執行]。注意
在執行
read_files
table 值函式時,如果函式的預設值無法剖析您的數據,您可能會遇到 schema 推斷錯誤。 例如,您可能需要為多行 CSV 或 JSON 檔案設定多行模式。 如需查看剖析器選項的 list,請參閱 read_files table-valued 函式。/* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
將數據載入串流 table
若要從雲端物件儲存體中的數據建立串流 table,請將下列內容貼到查詢編輯器中,然後按一下 執行 :
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')
Set 運行時間通道
使用 SQL 倉儲建立的串流 tables,會使用 Delta Live Tables 管線自動重新整理。 Delta Live Tables 管線預設會在 current
通道中使用執行環境。 請參閱 Delta Live Tables 發行說明和升級流程 以了解版本釋出流程。
Databricks 建議針對生產工作負載使用 current
通道。 新功能會先發行至 preview
通道。 您可以將管線 set 發送至預覽 Delta Live Tables 通道,並通過指定 preview
作為 table 屬性來測試新功能。 當您使用 ALTER 語句建立 table 或建立 table 之後,可以指定這個屬性。
下列程式代碼範例示範如何在 CREATE 語句中 set 要預覽的通道:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
使用 DLT 管線進行 Refresh 的串流 table
本節說明使用查詢中所定義來源中可用的最新數據,重新整理串流 table 模式。
當您 CREATE
或 REFRESH
串流 table時,update 會使用無伺服器 Delta Live Tables 管線來處理。 您定義的每個串流 table 都有相關聯的 Delta Live Tables 管道。
執行 REFRESH
命令之後,就會傳回 DLT 管線連結。 您可以使用 DLT 管線連結來檢查 refresh的狀態。
注意
只有 table 擁有者可以 refresh 傳輸 table 來 get 最新數據。 建立 table 的用戶是擁有者,且無法變更擁有者。 您可能需要先 refresh 串流 table,才能使用 時間移動 查詢。
僅導入新資料
根據預設,read_files
函式會在建立 table 期間讀取源目錄中的所有現有數據,然後使用每個 refresh處理新抵達的記錄。
為了避免在建立 table 時擷取來源目錄中已存在的數據,請使用 setincludeExistingFiles
選項來實現 false
。 這表示只有在 table 建立之後抵達目錄的數據會被處理。 例如:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
includeExistingFiles => false)
完全 refresh 串流 table
完全重新整理會以最新的定義重新處理來源中所有可用的資料。 不建議對無法保留完整數據歷史記錄或保留期較短(例如 Kafka)的來源進行完整重新整理,因為執行完整 refresh 會截斷現有的數據。 如果資料來源中的資料不再可供使用,您可能無法復原舊資料。
例如:
REFRESH STREAMING TABLE my_bronze_table FULL
排程自動 refresh 的串流 table
若要設定串流
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
例如 refresh 排程查詢,請參閱 ALTER STREAMING TABLE。
追蹤 refresh 的狀態
您可以透過在 Delta Live Tables UI 中檢視管理串流 table 的管線,或者檢視 DESCRIBE EXTENDED
命令針對串流 table傳回的 Refresh 資訊,來查看串流 tablerefresh 的狀態。
DESCRIBE EXTENDED <table-name>
從 Kafka 串流擷取
如需從 Kafka 串流擷取的範例,請參閱 read_kafka。
Grant 使用者可存取串流 table
若要給 grant 使用者在串流 table 上的 SELECT
許可權以便查詢,請將下列內容貼上至查詢編輯器中,然後按一下 執行。
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
如需在 Unity Catalog 可保護物件上授與許可權的詳細資訊,請參閱 Unity Catalog 許可權和可保護物件。
使用查詢歷程記錄監視執行
您可以使用 [查詢歷程記錄] 頁面來存取查詢詳細數據和查詢配置檔,以協助您在用來執行串流 table 更新的 Delta Live Tables 管線中識別效能不佳的查詢和瓶頸。 如需查詢歷程記錄和查詢配置檔中可用資訊種類的概觀,請參閱 查詢歷程記錄 和 查詢配置檔。
重要
這項功能處於公開預覽狀態。 工作區管理員可以從 [預覽 ] 頁面啟用這項功能。 請參閱管理 Azure Databricks 預覽版。
與串流 tables 相關的所有語句都會出現在查詢記錄中。 您可以使用 陳述 下拉篩選器來 select 任何命令,並檢查相關查詢。 所有 CREATE
語句之後,都會接著在 Delta Live Tables 管線上以異步方式執行的 REFRESH
語句。 這些 REFRESH
語句通常包含詳細的查詢計劃,以提供優化效能的見解。
若要存取 REFRESH
查詢歷程記錄 UI 中的語句,請使用下列步驟:
- 按兩下 左側提要欄位以開啟 [查詢歷程記錄 ] UI。
- 從 [語句] 的下拉式篩選中選擇 REFRESH 的 Select 複選框。
- 按兩下查詢語句的名稱,即可檢視摘要詳細數據,例如查詢的持續時間和匯總計量。
- 按兩下 [ 查看查詢設定檔 ] 以開啟查詢設定檔。 如需巡覽查詢配置檔的詳細資訊,請參閱 查詢配置檔 。
- 您可以選擇性地使用 [查詢來源] 區段中的連結來開啟相關的查詢或管線。
您也可以使用 SQL 編輯器中的連結,或從附加至 SQL 倉儲的筆記本存取查詢詳細數據。
注意
您的串流 table 必須設定為通過 預覽 頻道來運行。 請參閱 Set 運行時通道。