변경 내용 적용 API: Delta Live Tables를 사용하여 변경 데이터 캡처 간소화
델타 라이브 테이블은 및 APPLY CHANGES FROM SNAPSHOT
API를 사용하여 CDC(변경 데이터 캡처)를 APPLY CHANGES
간소화합니다. 사용하는 인터페이스는 변경 데이터의 원본에 따라 달라집니다.
- CDF(변경 데이터 피드)의 변경 내용을 처리하는 데 사용합니다
APPLY CHANGES
. - (공개 미리 보기)를 사용하여
APPLY CHANGES FROM SNAPSHOT
데이터베이스 스냅샷의 변경 내용을 처리합니다.
이전에는 이 MERGE INTO
문이 Azure Databricks에서 CDC 레코드를 처리하는 데 일반적으로 사용되었습니다. 그러나 MERGE INTO
순서가 잘못된 레코드로 인해 잘못된 결과를 생성하거나 레코드를 다시 정렬하기 위해 복잡한 논리가 필요할 수 있습니다.
APPLY CHANGES
API는 Delta Live Tables SQL 및 Python 인터페이스에서 지원됩니다. APPLY CHANGES FROM SNAPSHOT
API는 Delta Live Tables Python 인터페이스에서 지원됩니다.
SCD APPLY CHANGES
유형 1 및 APPLY CHANGES FROM SNAPSHOT
형식 2를 사용하여 테이블 업데이트를 지원합니다.
- SCD 유형 1을 사용하여 레코드를 직접 업데이트합니다. 업데이트된 레코드의 기록은 보존되지 않습니다.
- SCD 형식 2를 사용하여 모든 업데이트 또는 지정된 열 집합에 대한 업데이트에서 레코드 기록을 유지합니다.
구문 및 기타 참조는 다음을 참조하세요.
참고 항목
이 문서에서는 원본 데이터의 변경 내용을 기반으로 Delta Live Tables 파이프라인의 테이블을 업데이트하는 방법을 설명합니다. Delta 테이블에 대한 행 수준 변경 정보를 기록하고 쿼리하는 방법을 알아보려면 Azure Databricks에서 Delta Lake 변경 데이터 피드 사용을 참조하세요.
요구 사항
CDC API를 사용하려면 서버리스 DLT 파이프라인 또는 델타 라이브 테이블 Pro
또는 Advanced
버전을 사용하도록 파이프라인을 구성해야 합니다.
CDC는 API를 사용하여 어떻게 구현되는가 APPLY CHANGES
?
Delta Live Tables의 API는 순서가 잘못된 레코드 APPLY CHANGES
를 자동으로 처리하여 CDC 레코드의 올바른 처리를 보장하고 시퀀스 외부 레코드를 처리하기 위한 복잡한 논리를 개발할 필요가 없습니다. 원본 데이터에서 레코드를 시퀀싱할 열을 지정해야 합니다. 이 열은 Delta Live Tables가 원본 데이터의 적절한 순서를 단조적으로 증가시키는 표현으로 해석합니다. Delta Live Tables는 순서가 잘못 도착하는 데이터를 자동으로 처리합니다. SCD 형식 2 변경의 경우 Delta Live Tables는 적절한 시퀀싱 값을 대상 테이블 __START_AT
및 __END_AT
열에 전파합니다. 각 시퀀싱 값에는 키당 하나의 고유 업데이트가 있어야 하며 NULL 시퀀싱 값은 지원되지 않습니다.
CDC 처리를 APPLY CHANGES
수행하려면 먼저 스트리밍 테이블을 만든 다음 SQL의 문 또는 Python의 apply_changes()
함수를 사용하여 APPLY CHANGES INTO
변경 피드에 대한 원본, 키 및 시퀀싱을 지정합니다. 대상 스트리밍 테이블을 만들려면 SQL의 CREATE OR REFRESH STREAMING TABLE
문 또는 Python의 함수를 create_streaming_table()
사용합니다. SCD 유형 1 및 형식 2 처리 예제를 참조하세요.
구문 세부 정보는 Delta Live Tables SQL 참조 또는 Python 참조를 참조하세요.
CDC는 API를 사용하여 어떻게 구현되는가 APPLY CHANGES FROM SNAPSHOT
?
Important
APPLY CHANGES FROM SNAPSHOT
API는 공개 미리 보기로 제공됩니다.
APPLY CHANGES FROM SNAPSHOT
는 일련의 순차 스냅샷을 비교하여 원본 데이터의 변경 내용을 효율적으로 결정한 다음 스냅샷에서 레코드의 CDC 처리에 필요한 처리를 실행하는 선언적 API입니다. APPLY CHANGES FROM SNAPSHOT
는 Delta Live Tables Python 인터페이스에서만 지원됩니다.
APPLY CHANGES FROM SNAPSHOT
에서는 여러 원본 형식에서 스냅샷 수집을 지원합니다.
- 정기적인 스냅샷 수집을 사용하여 기존 테이블 또는 뷰에서 스냅샷을 수집합니다.
APPLY CHANGES FROM SNAPSHOT
에는 기존 데이터베이스 개체에서 정기적으로 스냅샷을 수집하도록 지원하는 간단하고 간소화된 인터페이스가 있습니다. 새 스냅샷은 각 파이프라인 업데이트와 함께 수집되며 수집 시간은 스냅샷 버전으로 사용됩니다. 파이프라인이 연속 모드로 실행되면 APPLY CHANGES FROM SNAPSHOT 처리를 포함하는 흐름에 대한 트리거 간격 설정에 의해 결정되는 기간에 각 파이프라인 업데이트와 함께 여러 스냅샷이 수집됩니다. - 기록 스냅샷 수집을 사용하여 Oracle 또는 MySQL 데이터베이스 또는 데이터 웨어하우스에서 생성된 스냅샷과 같은 데이터베이스 스냅샷이 포함된 파일을 처리합니다.
원본 형식 APPLY CHANGES FROM SNAPSHOT
에서 CDC 처리를 수행하려면 먼저 스트리밍 테이블을 만든 다음 Python의 함수를 사용하여 apply_changes_from_snapshot()
처리를 구현하는 데 필요한 스냅샷, 키 및 기타 인수를 지정합니다. 주기적인 스냅샷 수집 및 기록 스냅샷 수집 예제를 참조하세요.
API에 전달된 스냅샷은 버전별로 오름차순이어야 합니다. Delta Live Tables에서 순서가 다른 스냅샷을 검색하면 오류가 throw됩니다.
구문 세부 정보는 Delta Live Tables Python 참조를 참조하세요.
제한 사항
시퀀싱에 사용되는 열은 정렬 가능한 데이터 형식이어야 합니다.
예: CDF 원본 데이터를 사용하여 SCD 형식 1 및 SCD 형식 2 처리
다음 섹션에서는 변경 데이터 피드의 원본 이벤트를 기반으로 대상 테이블을 업데이트하는 Delta Live Tables SCD 유형 1 및 형식 2 쿼리의 예를 제공합니다.
- 새 사용자 레코드를 만듭니다.
- 사용자 레코드를 삭제합니다.
- 사용자 레코드를 업데이트합니다. SCD 형식 1 예제에서 마지막
UPDATE
작업은 늦게 도착하여 대상 테이블에서 삭제되어 순서가 다른 이벤트의 처리를 보여 줍니다.
다음 예제에서는 Delta Live Tables 파이프라인을 구성하고 업데이트하는 데 익숙하다고 가정합니다. 자습서: 첫 번째 Delta Live Tables 파이프라인 실행을 참조하세요.
이러한 예제를 실행하려면 먼저 샘플 데이터 세트를 만들어야 합니다. 테스트 데이터 생성을 참조하세요.
다음은 이러한 예제에 대한 입력 레코드입니다.
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | 오악사카 | INSERT | 1 |
123 | Isabel | 몬테레이 | INSERT | 1 |
125 | Mercedes | 티후아나 | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | 과달라하라 | UPDATE | 6 |
125 | Mercedes | 멕시칼리 | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
예제 데이터에서 마지막 행의 주석 처리를 제거하면 레코드를 잘라야 하는 위치를 지정하는 다음 레코드가 삽입됩니다.
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
참고 항목
다음 예제에는 모두 작업과 TRUNCATE
작업을 모두 지정 DELETE
하는 옵션이 포함되어 있지만 각각은 선택 사항입니다.
SCD 유형 1 업데이트 처리
다음 예제에서는 SCD 유형 1 업데이트를 처리하는 방법을 보여 줍니다.
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD 유형 1 예제를 실행하면 대상 테이블에 다음 레코드가 포함됩니다.
userId | name | city |
---|---|---|
124 | Raul | 오악사카 |
125 | Mercedes | 과달라하라 |
126 | Lily | Cancun |
추가 TRUNCATE
레코드를 사용하여 SCD 형식 1 예제를 실행한 후 sequenceNum=3
에서 TRUNCATE
작업으로 인해 레코드 124
및 126
이 잘리고 대상 테이블에는 다음 레코드가 포함됩니다.
userId | name | city |
---|---|---|
125 | Mercedes | 과달라하라 |
SCD 유형 2 업데이트 처리
다음 예제에서는 SCD 형식 2 업데이트를 처리하는 방법을 보여 줍니다.
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD 유형 2 예제를 실행하면 대상 테이블에 다음 레코드가 포함됩니다.
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | 몬테레이 | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | 오악사카 | 1 | null |
125 | Mercedes | 티후아나 | 2 | 5 |
125 | Mercedes | 멕시칼리 | 5 | 6 |
125 | Mercedes | 과달라하라 | 6 | null |
126 | Lily | Cancun | 2 | null |
SCD 형식 2 쿼리는 대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합을 지정할 수도 있습니다. 다른 열에 대한 변경 내용은 새 기록 레코드를 생성하는 대신 해당 위치에서 업데이트됩니다. 다음 예제에서는 추적에서 열을 제외하는 city
방법을 보여 줍니다.
다음 예제에서는 SCD 유형 2에서 트랙 기록을 사용하는 방법을 보여 줍니다.
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
추가 TRUNCATE
레코드 없이 이 예제를 실행한 후 대상 테이블에는 다음 레코드가 포함됩니다.
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | 오악사카 | 1 | null |
125 | Mercedes | 과달라하라 | 2 | null |
126 | Lily | Cancun | 2 | null |
테스트 데이터 생성
아래 코드는 이 자습서에 있는 예제 쿼리에서 사용할 예제 데이터 세트를 생성하기 위해 제공됩니다. 새 스키마를 만들고 새 테이블을 만들기 위한 적절한 자격 증명이 있다고 가정하면 Notebook 또는 Databricks SQL을 사용하여 이러한 문을 실행할 수 있습니다. 다음 코드는 Delta Live Tables 파이프라인의 일부로 실행되지 않습니다 .
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
예: 정기적인 스냅샷 처리
다음 예제에서는 에 저장된 mycatalog.myschema.mytable
테이블의 스냅샷을 수집하는 SCD 형식 2 처리를 보여 줍니다. 처리 결과는 이름이 인 target
테이블에 기록됩니다.
mycatalog.myschema.mytable
타임스탬프 2024-01-01 00:00:00의 레코드
키 | 값 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
타임스탬프 2024-01-01 12:00:00의 레코드
키 | 값 |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
스냅샷을 처리한 후 대상 테이블에는 다음 레코드가 포함됩니다.
키 | 값 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
예: 기록 스냅샷 처리
다음 예제에서는 클라우드 스토리지 시스템에 저장된 두 스냅샷의 원본 이벤트를 기반으로 대상 테이블을 업데이트하는 SCD 형식 2 처리를 보여 줍니다.
에 저장된 스냅 timestamp
샷 /<PATH>/filename1.csv
키 | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
에 저장된 스냅 timestamp + 5
샷 /<PATH>/filename2.csv
키 | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
다음 코드 예제에서는 이러한 스냅샷을 사용하여 SCD 형식 2 업데이트를 처리하는 방법을 보여 줍니다.
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
스냅샷을 처리한 후 대상 테이블에는 다음 레코드가 포함됩니다.
키 | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
대상 스트리밍 테이블에서 데이터 추가, 변경 또는 삭제
파이프라인이 Unity 카탈로그에 테이블을 게시하는 경우 삽입, 업데이트, 삭제 및 병합 문을 비롯한 DML(데이터 조작 언어) 문을 사용하여 문으로 APPLY CHANGES INTO
만든 대상 스트리밍 테이블을 수정할 수 있습니다.
참고 항목
- 스트리밍 테이블의 테이블 스키마를 수정하는 DML 문은 지원되지 않습니다. DML 문이 테이블 스키마의 진화를 시도하지 않는지 확인합니다.
- 스트리밍 테이블을 업데이트하는 DML 문은 Databricks Runtime 13.3 LTS 이상을 사용하여 공유 Unity 카탈로그 클러스터 또는 SQL 웨어하우스에서만 실행할 수 있습니다.
- 스트리밍에는 추가 전용 데이터 원본이 필요하기 때문에 처리 시 변경 내용이 있는 원본 스트리밍 테이블에서 스트리밍이 필요한 경우(예: DML 문) 원본 스트리밍 테이블을 읽을 때 skipChangeCommits 플래그를 설정합니다.
skipChangeCommits
가 설정되면 원본 테이블에서 레코드를 삭제하거나 수정하는 트랜잭션은 무시됩니다. 처리에 스트리밍 테이블이 필요하지 않은 경우 구체화된 뷰(추가 전용 제한이 없음)를 대상 테이블로 사용할 수 있습니다.
Delta Live Tables는 지정된 SEQUENCE BY
열을 사용하고 적절한 시퀀싱 값을 __START_AT
대상 테이블의 열( __END_AT
SCD 형식 2의 경우)에 전파하므로 DML 문이 이러한 열에 유효한 값을 사용하여 적절한 레코드 순서를 유지해야 합니다. APPLY CHANGES API를 사용하여 CDC를 구현하는 방법을 참조하세요.
스트리밍 테이블과 함께 DML 문을 사용하는 방법에 대한 자세한 내용은 스트리밍 테이블의 데이터 추가, 변경 또는 삭제를 참조 하세요.
다음 예제에서는 시작 시퀀스가 5인 활성 레코드를 삽입합니다.
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
대상 테이블에서 변경 데이터 피드 APPLY CHANGES
읽기
Databricks Runtime 15.2 이상에서는 다른 델타 테이블에서 변경 데이터 피드를 읽는 것과 동일한 방식으로 대상 APPLY CHANGES
인 스트리밍 테이블 또는 APPLY CHANGES FROM SNAPSHOT
쿼리에서 변경 데이터 피드를 읽을 수 있습니다. 대상 스트리밍 테이블에서 변경 데이터 피드를 읽으려면 다음이 필요합니다.
- 대상 스트리밍 테이블을 Unity 카탈로그에 게시해야 합니다. Delta Live Tables 파이프라인에서 Unity 카탈로그 사용을 참조 하세요.
- 대상 스트리밍 테이블에서 변경 데이터 피드를 읽으려면 Databricks Runtime 15.2 이상을 사용해야 합니다. 다른 Delta Live Tables 파이프라인에서 변경 데이터 피드를 읽으려면 Databricks Runtime 15.2 이상을 사용하도록 파이프라인을 구성해야 합니다.
다른 Delta 테이블에서 변경 데이터 피드를 읽는 것과 동일한 방식으로 Delta Live Tables 파이프라인에서 만든 대상 스트리밍 테이블에서 변경 데이터 피드를 읽습니다. Python 및 SQL의 예제를 포함하여 델타 변경 데이터 피드 기능을 사용하는 방법에 대한 자세한 내용은 Azure Databricks에서 Delta Lake 변경 데이터 피드 사용을 참조 하세요.
참고 항목
변경 데이터 피드 레코드에는 변경 이벤트 유형을 식별하는 메타데이터가 포함됩니다. 테이블에서 레코드가 업데이트되면 연결된 변경 레코드에 대한 메타데이터에는 일반적으로 설정된 update_preimage
값과 update_postimage
이벤트가 포함됩니다_change_type
.
그러나 _change_type
기본 키 값 변경을 포함하는 대상 스트리밍 테이블에 대한 업데이트가 이루어지면 값이 다릅니다. 변경 내용에 기본 키 _change_type
에 대한 업데이트가 포함되면 메타데이터 필드가 설정되고 delete
이벤트가 설정 insert
됩니다. 기본 키에 대한 변경은 또는 문이 있는 키 필드 UPDATE
중 하나를 수동으로 업데이트하거나 MERGE
SCD 형식 2 테이블의 경우 필드가 이전 시작 시퀀스 값을 반영하도록 변경될 때 __start_at
발생할 수 있습니다.
쿼리는 APPLY CHANGES
SCD 유형 1 및 SCD 형식 2 처리에 대해 다른 기본 키 값을 결정합니다.
- SCD 형식 1 처리 및 Delta Live Tables Python 인터페이스의 경우 기본 키는 함수의
keys
매개 변수apply_changes()
값입니다. Delta Live Tables SQL 인터페이스의 경우 기본 키는 문의 절에APPLY CHANGES INTO
정의된KEYS
열입니다. - SCD 형식 2의 경우 기본 키는
keys
매개 변수 또는 절과KEYS
작업의 반환 값coalesce(__START_AT, __END_AT)
입니다. 여기서 대상 스트리밍 테이블의 해당 열은 다음과__START_AT
__END_AT
같습니다.
Delta Live Tables CDC 쿼리에서 처리된 레코드에 대한 데이터 가져오기
참고 항목
다음 메트릭은 쿼리가 아닌 쿼리에 의해 APPLY CHANGES
APPLY CHANGES FROM SNAPSHOT
서만 캡처됩니다.
다음 메트릭은 쿼리에 의해 APPLY CHANGES
캡처됩니다.
num_upserted_rows
: 업데이트 중에 데이터 세트에 삽입된 출력 행의 수입니다.num_deleted_rows
: 업데이트 중에 데이터 세트에서 삭제된 기존 출력 행의 수입니다.
num_output_rows
CDC가 아닌 흐름에 대한 출력인 메트릭은 쿼리에 대해 apply changes
캡처되지 않습니다.
Delta Live Tables CDC 처리에 사용되는 데이터 개체는 무엇인가요?
참고: 다음 데이터 구조는 처리가 아닌 APPLY CHANGES FROM SNAPSHOT
처리에 APPLY CHANGES
만 적용됩니다.
Hive 메타스토어에서 대상 테이블을 선언하면 두 개의 데이터 구조가 만들어집니다.
- 대상 테이블에 할당된 이름을 사용하는 뷰입니다.
- CDC 처리를 관리하기 위해 Delta Live Tables에서 사용하는 내부 지원 테이블입니다. 이 테이블의 이름은 대상 테이블 이름 앞에 추가하여
__apply_changes_storage_
지정됩니다.
예를 들어 명명 dlt_cdc_target
된 대상 테이블을 선언하면 메타스토어에 명명된 dlt_cdc_target
뷰와 테이블이 __apply_changes_storage_dlt_cdc_target
표시됩니다. 뷰를 만들면 Delta Live Tables에서 순서가 다른 데이터를 처리하는 데 필요한 추가 정보(예: 삭제 표시 및 버전)를 필터링할 수 있습니다. 처리된 데이터를 보려면 대상 뷰를 쿼리합니다. 테이블의 스키마가 __apply_changes_storage_
향후 기능 또는 향상된 기능을 지원하도록 변경될 수 있으므로 프로덕션 사용을 위해 테이블을 쿼리하면 안 됩니다. 테이블에 데이터를 수동으로 추가하는 경우 버전 열이 누락되어 레코드가 다른 변경 전에 오는 것으로 간주됩니다.
파이프라인이 Unity 카탈로그에 게시되는 경우 내부 지원 테이블에는 사용자가 액세스할 수 없습니다.