使用合併將Upsert插入 Delta Lake 資料表
您可以使用 SQL 作業,將源數據表、檢視或 DataFrame 中的數據向上插入目標 Delta 數據表 MERGE
。 Delta Lake 支援 在 中 MERGE
插入、更新和刪除 ,並支援超越 SQL 標準的擴充語法,以利進階使用案例。
假設您有名為 people10mupdates
的源數據表或 來源 /tmp/delta/people-10m-updates
路徑,其中包含名為 people10m
的目標數據表的新數據,或位於 /tmp/delta/people-10m
的目標路徑。 其中一些新記錄可能已存在於目標數據中。 若要合併新數據,您想要更新人員 id
已經存在的數據列,並插入沒有相符 id
專案的新數據列。 您可以執行下列查詢:
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
重要
只有源數據表中的單一數據列可以比對目標數據表中的指定數據列。 在 Databricks Runtime 16.0 和更新版本中,MERGE
評估 和 ON
子句中指定的WHEN MATCHED
條件,以判斷重複的相符專案。 在 Databricks Runtime 15.4 LTS 和以下 MERGE
,作業只會考慮 子句中指定的 ON
條件。
如需 Scala 和 Python 語法詳細數據, 請參閱 Delta Lake API 檔 。 如需 SQL 語法詳細數據,請參閱 MERGE INTO
使用合併修改所有不相符的數據列
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 和更新版本中,您可以使用 WHEN NOT MATCHED BY SOURCE
子句在UPDATE
DELETE
源數據表中沒有對應記錄的目標數據表或記錄。 Databricks 建議新增選擇性條件子句,以避免完全重寫目標數據表。
下列程式代碼範例示範使用這個 進行刪除的基本語法、以源數據表的內容覆寫目標數據表,以及刪除目標數據表中不相符的記錄。 如需來源更新和刪除時間系結之數據表的更可調整模式,請參閱 以累加方式同步差異數據表與來源。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
下列範例會將條件新增至 子句, WHEN NOT MATCHED BY SOURCE
並指定要在不相符的目標數據列中更新的值。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
合併作業語意
以下是程式設計作業語意的詳細描述 merge
。
可以有任意數目的
whenMatched
和whenNotMatched
子句。whenMatched
當源數據列根據比對條件符合目標數據表數據列時,就會執行 子句。 這些子句具有下列語意。whenMatched
子句最多可以有一個update
和一個delete
動作。update
中的merge
動作只會更新相符目標數據列的指定數據行(類似於update
作業)。 動作delete
會刪除相符的數據列。每個
whenMatched
子句都可以有選擇性條件。 如果這個子句條件存在,則只有在子句條件為 true 時,update
才會針對任何相符的來源目標數據列組執行 或delete
動作。如果有多個
whenMatched
子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenMatched
子句都必須有條件。whenMatched
如果符合合併條件的來源和目標數據列組沒有評估為 true,則目標數據列會保持不變。若要使用來源資料集的對應資料列來更新目標 Delta 資料表的所有資料列,請使用
whenMatched(...).updateAll()
。 這相當於:whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
針對目標 Delta 資料表的所有數據行。 因此,此動作假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。
注意
啟用自動架構演進時,此行為會變更。 如需詳細資訊,請參閱 自動架構演進 。
whenNotMatched
當來源數據列不符合任何以比對條件為基礎的目標數據列時,就會執行 子句。 這些子句具有下列語意。whenNotMatched
子句只能insert
有 動作。 新的數據列會根據指定的數據行和對應的表達式產生。 您不需要指定目標資料表中的所有資料列。 針對未指定的目標資料列,NULL
會插入 。每個
whenNotMatched
子句都可以有選擇性條件。 如果子句條件存在,則只有在該數據列的條件為 true 時,才會插入源數據列。 否則,會忽略源數據行。如果有多個
whenNotMatched
子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenNotMatched
子句都必須有條件。若要使用來源資料集的對應資料列插入目標 Delta 資料表的所有資料列,請使用
whenNotMatched(...).insertAll()
。 這相當於:whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
針對目標 Delta 資料表的所有數據行。 因此,此動作假設源數據表的數據行與目標數據表中的數據行相同,否則查詢會擲回分析錯誤。
注意
啟用自動架構演進時,此行為會變更。 如需詳細資訊,請參閱 自動架構演進 。
whenNotMatchedBySource
當目標數據列不符合根據合併條件的任何源數據列時,就會執行 子句。 這些子句具有下列語意。whenNotMatchedBySource
子句可以指定delete
和update
動作。- 每個
whenNotMatchedBySource
子句都可以有選擇性條件。 如果子句條件存在,只有當該數據列的條件為 true 時,才會修改目標數據列。 否則,目標數據列會維持不變。 - 如果有多個
whenNotMatchedBySource
子句,則會依照指定的順序來評估它們。 除了最後一個子句之外,所有whenNotMatchedBySource
子句都必須有條件。 - 根據定義,
whenNotMatchedBySource
子句沒有從中提取數據行值的來源數據列,因此無法參考源數據行。 若要修改每個資料列,您可以指定常值或對目標資料行執行動作,例如SET target.deleted_count = target.deleted_count + 1
。
重要
merge
如果源數據集的多個數據列相符,且合併嘗試更新目標 Delta 數據表的相同數據列,作業可能會失敗。 根據合併的 SQL 語意,這類更新作業模棱兩可,因為不清楚應該使用哪個來源數據列來更新相符的目標數據列。 您可以預先處理源數據表,以排除多個相符專案的可能性。- 只有當檢視已定義為
CREATE VIEW viewName AS SELECT * FROM deltaTable
時,才可以在 SQL VIEW 上套用 SQLMERGE
作業。
寫入 Delta 資料表時重複資料刪除
常見的 ETL 使用案例是將記錄附加至數據表,以將記錄收集至 Delta 數據表。 不過,通常來源可能會產生重複的記錄檔記錄,而需要下游重複數據刪除步驟才能處理它們。 使用 merge
時,您可以避免插入重複的記錄。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注意
包含新記錄的數據集必須在本身內重複數據刪除。 藉由合併的 SQL 語意,它會比對和重複數據刪除新的資料與數據表中的現有數據,但如果新數據集內有重複的數據,則會插入。 因此,重複數據刪除新的資料之後再合併至數據表。
如果您知道只有幾天可能會取得重複的記錄,您可以藉由依日期分割數據表,然後指定要比對的目標數據表日期範圍,進一步優化查詢。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
這比上一個命令更有效率,因為它只會在過去 7 天的記錄中尋找重複專案,而不是整個數據表。 此外,您可以使用這個僅限插入的合併與結構化串流來執行記錄連續重複數據刪除。
- 在串流查詢中,您可以使用 中的
foreachBatch
合併作業,以持續將任何串流數據寫入重複數據刪除至 Delta 資料表。 如需 的詳細資訊foreachBatch
,請參閱下列串流範例。 - 在另一個串流查詢中,您可以持續讀取此 Delta 資料表的重複資料刪除資料。 這是可能的,因為僅插入合併只會將新數據附加至 Delta 數據表。
使用 Delta Lake 緩時變資料 (SCD) 和異動資料擷取 (CDC)
Delta Live Tables 原生支持追蹤和套用 SCD 類型 1 和 Type 2。 與 Delta Live Tables 搭配使用 APPLY CHANGES INTO
,以確保處理 CDC 摘要時,會正確處理順序不足的記錄。 請參閱套用變更 API:使用差異即時資料表簡化異動資料擷取。
以累加方式同步差異數據表與來源
在 Databricks SQL 和 Databricks Runtime 12.2 LTS 和更新版本中,您可以使用 WHEN NOT MATCHED BY SOURCE
來建立任意條件,以不可部分完成刪除和取代數據表的一部分。 當您有源數據表時,記錄在初始數據輸入之後可能會變更或刪除數天,但最終會確定為最終狀態時,這特別有用。
下列查詢顯示使用此模式從來源選取 5 天的記錄、更新目標中的相符記錄、將來源的新記錄插入目標,以及從目標中刪除過去 5 天的所有不相符記錄。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
藉由在來源和目標數據表上提供相同的布爾篩選,您就可以動態地將變更從來源傳播到目標數據表,包括刪除。
注意
雖然此模式可以在沒有任何條件子句的情況下使用,但這會導致完全重寫可能很昂貴的目標數據表。