常見的資料載入模式
自動載入器可簡化許多常見的數據擷取工作。 此快速參考提供數種熱門模式的範例。
使用 Glob 模式篩選目錄或檔案
Glob 模式可用於在路徑中提供時篩選目錄和檔案。
模式 | 描述 |
---|---|
? |
比對任何單一字元 |
* |
比對零或多個字元 |
[abc] |
比對字元集 {a,b,c} 的單一字元。 |
[a-z] |
比對字元範圍 {a... 中的單一字元...z}. |
[^a] |
比對不是字元集或範圍 {a} 的單一字元。 請注意, ^ 字元必須緊接在左括弧右邊。 |
{ab,cd} |
比對字串集 {ab, cd} 中的字串。 |
{ab,c{de, fh}} |
比對字串集 {ab, cde, cfh} 中的字串。 |
path
使用 來提供前置詞模式,例如:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
重要
您必須使用 選項 pathGlobFilter
來明確提供後綴模式。 path
只會提供前置詞篩選條件。
例如,如果您想要只 png
剖析包含具有不同後綴之檔案的目錄中的檔案,您可以執行下列動作:
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
注意
自動載入器的預設擷取行為與其他Spark檔案來源的預設行為不同。 將 新增 .option("cloudFiles.useStrictGlobber", "true")
至您的讀取,以使用符合預設 Spark 行為與檔案來源的 Globbing。 如需有關 Globbing 的詳細資訊,請參閱下表:
模式 | 檔案路徑 | 預設 globber | Strict globber |
---|---|---|---|
/a/b | /a/b/c/file.txt | 是 | 是 |
/a/b | /a/b_dir/c/file.txt | 否 | 否 |
/a/b | /a/b.txt | 否 | 否 |
/a/b/ | /a/b.txt | 否 | 否 |
/a/*/c/ | /a/b/c/file.txt | 是 | 是 |
/a/*/c/ | /a/b/c/d/file.txt | 是 | 是 |
/a/*/c/ | /a/b/x/y/c/file.txt | 是 | 否 |
/a/*/c | /a/b/c_file.txt | 是 | 否 |
/a/*/c/ | /a/b/c_file.txt | 是 | 否 |
/a/*/c/ | /a/*/cookie/file.txt | 是 | 否 |
/a/b* | /a/b.txt | 是 | 是 |
/a/b* | /a/b/file.txt | 是 | 是 |
/a/{0.txt,1.txt} | /a/0.txt | 是 | 是 |
/a/*/{0.txt,1.txt} | /a/0.txt | 否 | 否 |
/a/b/[cde-h]/i/ | /a/b/c/i/file.txt | 是 | 是 |
啟用簡單的 ETL
將數據放入 Delta Lake 而不遺失任何數據的簡單方式,就是使用下列模式,並使用自動載入器啟用架構推斷。 Databricks 建議在 Azure Databricks 作業中執行下列程式代碼,以在源數據的架構變更時自動重新啟動串流。 根據預設,架構會推斷為字串類型、任何剖析錯誤(如果所有項目維持為字串)都會移至 _rescued_data
,而且任何新的數據行都會失敗數據流並演進架構。
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
防止數據在結構良好的數據中遺失
當您知道架構,但想要知道何時收到非預期的數據時,Databricks 建議使用 rescuedDataColumn
。
Python
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
如果您想要讓串流在引進不符合架構的新欄位時停止處理,您可以新增:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
啟用彈性的半結構化數據管線
當您從引進新數據行的廠商接收數據時,可能無法完全瞭解其執行時,或您可能沒有頻寬來更新數據管線。 您現在可以利用架構演進來重新啟動數據流,並讓自動載入器自動更新推斷的架構。 您也可以利用 schemaHints
廠商可能提供的一些「無架構」欄位。
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
轉換巢狀 JSON 數據
因為自動載入器會將最上層 JSON 數據行推斷為字串,所以您可以使用需要進一步轉換的巢狀 JSON 物件留下。 您可以使用 半結構化數據存取 API 來進一步轉換複雜的 JSON 內容。
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
推斷巢狀 JSON 數據
當您有巢狀數據時,可以使用 cloudFiles.inferColumnTypes
選項來推斷數據和其他數據行類型的巢狀結構。
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
載入不含標頭的 CSV 檔案
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
在具有標頭的 CSV 檔案上強制執行架構
Python
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
Scala
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
將影像或二進位數據內嵌至 Delta Lake for ML
將數據儲存在 Delta Lake 中之後,您就可以對數據執行分散式推斷。 請參閱 使用 pandas UDF 執行分散式推斷。
Python
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
Scala
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
DLT 的自動載入器語法
Delta Live Tables 為自動載入器提供稍微修改的 Python 語法,可新增自動載入器的 SQL 支援。
下列範例使用自動載入器從 CSV 和 JSON 檔案建立資料集:
Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
SQL
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
您可以搭配自動載入器使用支援 的格式選項 。 您可以使用 函式 map()
,將選項傳遞至 cloud_files()
方法。 選項是索引鍵/值組,其中索引鍵和值是字串。 以下描述在 SQL 中使用自動載入器的語法:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
下列範例會從具有標頭的索引標籤分隔 CSV 檔案讀取資料:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
您可以使用 schema
手動指定格式;您必須指定 schema
不支援 架構推斷的格式:
Python
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
SQL
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
注意
Delta Live Tables 會在使用自動載入器讀取檔案時,自動設定和管理架構和檢查點目錄。 不過,如果您手動設定其中一個目錄,則執行完整重新整理不會影響已設定目錄的內容。 Databricks 建議使用自動設定的目錄,以避免在處理期間發生非預期的副作用。