다음을 통해 공유


델타 table 스트리밍 읽기 및 쓰기

Delta Lake는 readStream을 통해 writeStream과 긴밀하게 통합됩니다. Delta Lake는 다음을 포함하여 일반적으로 스트리밍 시스템 및 파일과 관련된 많은 제한 사항을 극복합니다.

  • 짧은 대기 시간 수집으로 생성된 작은 파일을 병합합니다.
  • 둘 이상의 스트림(또는 동시 일괄 처리 작업)을 사용하여 "정확히 한 번" 처리를 유지 관리합니다.
  • 스트림의 원본으로 파일을 사용할 때 새로운 파일을 효율적으로 검색합니다.

참고 항목

이 문서에서는 Delta Lake tables 스트리밍 원본 및 싱크로 사용하는 방법에 대해 설명합니다. Databricks SQL에서 스트리밍 사용하여 데이터를 로드하는 방법을 알아보려면 Databricks SQL스트리밍 사용하여 데이터 로드 참조하세요.

Delta Lake를 사용한 스트림 정적 조인에 대한 자세한 내용은 Stream-static 조인을 참조 하세요.

델타 table 소스로서

구조적 스트리밍은 델타 tables를 점진적으로 읽습니다. 델타 table에 대해 스트리밍 쿼리가 활성화되어 있는 동안, 원본 table에 새 table 버전이 커밋될 때마다 새 레코드가 멱등하게 처리됩니다.

다음 코드 예제에서는 table 이름 또는 파일 경로를 사용하여 스트리밍 읽기를 구성하는 방법을 보여 줍니다.

Python

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Scala

spark.readStream.table("table_name")

spark.readStream.load("/path/to/table")

Important

스트리밍 읽기가 table에 대해 시작된 후 Delta table의 schema가 변경되면 쿼리가 실패합니다. 대부분의 schema 변경 내용의 경우 스트림을 다시 시작하여 schema 불일치를 해결하고 처리를 계속할 수 있습니다.

Databricks Runtime 12.2 LTS 이하에서는 columns이름 바꾸기 또는 삭제와 같이 비가산적 schema 진화된 column 매핑을 사용하도록 설정된 Delta table 스트리밍할 수 없습니다. 자세한 내용은 스트리밍 및 column 매핑, 그리고 schema 변경 내용을 참조하세요.

Limit 입력 속도

마이크로 일괄 처리를 제어하는 데 사용할 수 있는 옵션은 다음과 같습니다.

  • maxFilesPerTrigger: 모든 마이크로 일괄 처리에서 고려할 새 파일의 수입니다. 기본값은 1000입니다.
  • maxBytesPerTrigger: 각 마이크로 일괄 처리에서 처리되는 데이터의 양입니다. 이 옵션은 "소프트 최대값"을 설정하며, 이는 일괄 처리가 대략 이 양의 데이터를 처리함을 의미합니다. 그러나 입력 단위가 limit보다 큰 경우, 스트리밍 쿼리가 계속 진행되도록 하기 위해 limit보다 더 많은 데이터를 처리할 수도 있습니다. 기본값으로는 set이 아닙니다.

maxBytesPerTriggermaxFilesPerTrigger과 함께 사용하는 경우, 마이크로 일괄 처리는 maxFilesPerTrigger 또는 maxBytesPerTriggerlimit에 도달할 때까지 데이터를 처리합니다.

참고 항목

logRetentionDuration 구성 인해 원본 table 트랜잭션이 정리되고 스트리밍 쿼리가 해당 버전을 처리하려고 시도하는 경우 기본적으로 쿼리는 데이터 손실을 방지하지 못합니다. 옵션 failOnDataLoss을/를 set하여 false 손실된 데이터를 무시하고 처리를 계속할 수 있습니다.

Delta Lake CDC(변경 데이터 캡처) 피드 스트리밍

Delta Lake 은 변경 데이터 피드를 통해 델타 table의 업데이트 및 삭제를 포함한 변경 내용을 기록합니다. 사용 설정 시, 변경 데이터 피드에서 스트리밍을 하여 논리를 작성하고, 삽입, 업데이트 및 삭제를 후속 단계 tables에 처리할 수 있습니다. 변경 데이터 피드 데이터 출력은 설명한 델타 table 약간 다르지만 medallion 아키텍처다운스트림 tables 증분 변경 내용을 전파하는 솔루션을 제공합니다.

Important

Databricks Runtime 12.2 LTS 이하에서는 비가산적 schema 진화를 겪은 column 매핑이 설정된 Delta table에 대해, columns이름 바꾸기 또는 삭제와 같은 경우 변경 데이터 피드에서 스트리밍할 수 없습니다. 스트리밍을 column 매핑 및 schema 변경 내용와 함께 참조하세요.

업데이트 및 삭제 무시

구조적 스트리밍은 추가가 아닌 입력을 처리하지 않으며, 원본으로 사용되는 table에 수정이 발생하면 예외를 발생시킵니다. 다운스트림으로 자동 전파될 수 없는 변경 내용을 처리하기 위한 두 가지 주요 전략이 있습니다.

  • 출력과 검사점을 삭제하고 스트림을 처음부터 다시 시작할 수 있습니다.
  • 다음 두 옵션 중 하나를 선택할 수 set 있습니다.
    • ignoreDeletes: partition 경계에서 데이터를 삭제하는 트랜잭션을 무시합니다.
    • skipChangeCommits: 기존 레코드를 삭제하거나 수정하는 트랜잭션을 무시합니다. skipChangeCommitsignoreDeletes를 포함합니다.

참고 항목

Databricks Runtime 12.2 LTS 이상 skipChangeCommits 에서는 이전 설정을 ignoreChanges더 이상 사용하지 않습니다. Databricks Runtime 11.3 LTS 이하 ignoreChanges 에서 유일하게 지원되는 옵션입니다.

ignoreChanges의 의미 체계는 skipChangeCommits의 의미 체계와 크게 다릅니다. ignoreChanges 사용하도록 설정하면 UPDATE, MERGE INTO, DELETE(파티션 내) 또는 OVERWRITE같은 데이터 변경 작업 후에 원본 table 다시 작성된 데이터 파일이 다시 내보내집니다. 변경되지 않은 행은 새 행과 함께 내보내지는 경우가 많으므로 다운스트림 소비자는 중복을 처리할 수 있어야 합니다. 삭제는 다운스트림으로 전파되지 않습니다. ignoreChangesignoreDeletes를 포함합니다.

skipChangeCommits는 파일 변경 작업을 완전히 무시합니다. UPDATE, MERGE INTO, DELETEOVERWRITE 같은 데이터 변경 작업으로 인해 원본 table 다시 작성된 데이터 파일은 완전히 무시됩니다. 업스트림 원본 tables변경 내용을 반영하려면 이러한 변경 내용을 전파하는 별도의 논리를 구현해야 합니다.

구성된 ignoreChanges 워크로드는 알려진 의미 체계를 사용하여 계속 작동하지만 Databricks는 모든 새 워크로드에 사용하는 skipChangeCommits 것이 좋습니다. 워크로드를 ignoreChanges 마이그레이션하려면 skipChangeCommits 리팩터링 논리가 필요합니다.

예시

예를 들어, date, user_email, 및 actioncolumns가 있는 tableuser_eventsdate에 의해 분할된 경우를 가정해 봅시다. user_events table 스트림에서 데이터를 스트리밍 중이며 GDPR 준수를 위해 데이터를 삭제해야 합니다.

partition 경계(즉, WHEREpartitioncolumn)에서 삭제하는 경우 파일은 값에 따라 이미 분할되어 있으므로 삭제는 메타데이터에서 해당 파일을 제거합니다. 전체 데이터 partition를 삭제할 때, 다음을 사용할 수 있습니다.

spark.readStream
  .option("ignoreDeletes", "true")
  .table("user_events")

여러 파티션에서 데이터를 삭제하는 경우(이 예제에서는 필터링 user_email) 다음 구문을 사용합니다.

spark.readStream
  .option("skipChangeCommits", "true")
  .table("user_events")

UPDATE 문을 사용하여 user_emailupdate 경우 해당 user_email 포함하는 파일을 다시 작성합니다. 변경된 데이터 파일을 무시하는 데 사용합니다 skipChangeCommits .

초기 위치 지정

다음 옵션을 사용하여 전체 table처리하지 않고 Delta Lake 스트리밍 원본의 시작점을 지정할 수 있습니다.

  • startingVersion: 시작할 Delta Lake 버전입니다. Databricks는 대부분의 워크로드에 대해 이 옵션을 생략하는 것이 좋습니다. set않은 경우 스트림은 현재 table 전체 스냅샷을 포함하여 사용 가능한 최신 버전에서 시작됩니다.

    지정한 경우, 스트림은 지정된 버전(포함)에서 시작하여 Delta table의 모든 변경 내용을 읽습니다. 지정된 버전을 더 이상 사용할 수 없으면 스트림이 시작되지 않습니다. DESCRIBE HISTORY 명령 출력의 versioncolumn에서 커밋 버전을 가져올 수 있습니다.

    최신 변경 내용만 반환하려면 .를 지정합니다 latest.

  • startingTimestamp: 시작할 타임스탬프입니다. 타임스탬프(포함) 이후에 커밋된 모든 table 변경은 스트리밍 리더에서 읽습니다. 제공된 타임스탬프가 모든 table 커밋 앞에 오는 경우 스트리밍 읽기는 사용 가능한 가장 빠른 타임스탬프로 시작됩니다. 다음 중 하나입니다.

    • 타임스탬프 문자열입니다. 예들 들어 "2019-01-01T00:00:00.000Z"입니다.
    • 날짜 문자열입니다. 예들 들어 "2019-01-01"입니다.

두 옵션을 동시에 set 수 없습니다. 새 스트리밍 쿼리를 시작할 때만 적용됩니다. 스트리밍 쿼리가 시작되고 진행 상황이 검사점에 기록된 경우 이러한 옵션은 무시됩니다.

Important

지정된 버전 또는 타임스탬프에서 스트리밍 원본을 시작할 수 있지만, 스트리밍 원본의 schema은 항상 Delta table의 최신 schema이다. 지정된 버전 또는 타임스탬프 이후, 델타 table에 대해 호환되지 않는 schema의 변경이 없음을 확인해야 합니다. 그렇지 않으면 잘못된 schema사용하여 데이터를 읽을 때 스트리밍 원본이 잘못된 결과를 반환할 수 있습니다.

예시

예를 들어 tableuser_events있다고 가정합니다. 버전 5 이후의 변경 내용을 읽으려면 다음을 사용합니다.

spark.readStream
  .option("startingVersion", "5")
  .table("user_events")

2018년 10월 18일 이후의 변경 내용을 읽으려면 다음을 사용합니다.

spark.readStream
  .option("startingTimestamp", "2018-10-18")
  .table("user_events")

데이터 삭제 없이 초기 스냅샷 처리

참고 항목

이 기능은 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다. 이 기능은 공개 미리 보기 상태입니다.

델타 table을 스트림 원본으로 사용할 때, 쿼리는 먼저 table에 현재 있는 모든 데이터를 처리합니다. 이 버전의 Delta table 초기 스냅샷이라고 합니다. 기본적으로 Delta table데이터 파일은 마지막으로 수정된 파일에 따라 처리됩니다. 그러나 마지막 수정 시간이 반드시 레코드 이벤트 시간 순서를 나타내는 것은 아닙니다.

정의된 watermark이 있는 상태 저장 스트리밍 쿼리에서 파일을 수정 시간에 따라 처리하면, 레코드가 오류가 있는 순서로 처리될 수 있습니다. 이로 인해 레코드가 watermark지연 이벤트로 떨어질 수 있습니다.

다음 옵션을 사용해서 데이터 삭제 문제를 방지할 수 있습니다.

  • withEventTimeOrder: 초기 스냅샷을 이벤트 시간 순서로 처리할지 여부입니다.

이벤트 시간 순서를 사용하면 초기 스냅샷 데이터의 이벤트 시간 범위가 시간 버킷으로 분할됩니다. 각 마이크로 일괄 처리는 시간 범위 내에서 데이터를 필터링하여 버킷을 처리합니다. maxFilesPerTrigger 및 maxBytesPerTrigger 구성 옵션은 여전히 마이크로 일괄 처리 크기를 제어하는 데 적용할 수 있지만 처리 특성으로 인해 대략적인 방식으로만 적용됩니다.

아래 그래픽은 이 프로세스를 보여줍니다.

초기 스냅샷

이 기능의 중요 정보는 다음과 같습니다.

  • 데이터 삭제 문제는 상태 저장 스트리밍 쿼리의 초기 델타 스냅샷이 기본 순서로 처리될 때만 발생합니다.
  • 초기 스냅샷이 처리되는 동안 스트림 쿼리가 시작된 후에는 withEventTimeOrder를 변경할 수 없습니다. withEventTimeOrder를 변경한 상태로 다시 시작하려면 체크포인트를 삭제해야 합니다.
  • withEventTimeOrder를 사용하여 스트림 쿼리를 실행하는 경우 초기 스냅샷 처리가 완료될 때까지 이 기능을 지원하지 않는 DBR 버전으로 다운그레이드할 수 없습니다. 다운그레이드해야 하는 경우 초기 스냅샷이 완료될 때까지 기다리거나 체크포인트를 삭제하고 쿼리를 다시 시작할 수 있습니다.
  • 이 기능은 다음과 같은 일반적이지 않은 시나리오에서는 지원되지 않습니다.
    • 이벤트 시간 column는 생성된 column과 같으며, 델타 원본과 watermark간에는 프로젝션되지 않은 변환이 존재합니다.
    • 스트림 쿼리에 델타 원본이 둘 이상 있는 watermark가 있습니다.
  • 이벤트 시간 순서를 사용하도록 설정하면 델타 초기 스냅샷 처리의 성능이 느려질 수 있습니다.
  • 각 마이크로 일괄 처리에서는 초기 스냅샷을 스캔하여 해당 이벤트 시간 범위 내에서 데이터를 필터링합니다. 더 빠른 필터 동작을 위해, 이벤트 시간으로 델타 소스 를 사용하는 것이 좋습니다. 이렇게 하면 데이터 건너뛰기를 적용할 수 있습니다. Delta Lake의 데이터 건너뛰기 적용 가능 여부는 을 확인하십시오. 이와 더불어, 이벤트 시간 column을 기준으로 한 table 분할은 처리 속도를 더욱 높일 수 있습니다. Spark UI에서 특정 마이크로 일괄 처리에 대해 델타 파일이 얼마나 많이 스캔되었는지 확인할 수 있습니다.

예시

당신에게 event_timecolumn이 있는 tableuser_events이 있다고 가정해 보세요. 스트리밍 쿼리는 집계 쿼리입니다. 초기 스냅샷 처리 중 데이터가 삭제되지 않도록 하려면 다음을 사용할 수 있습니다.

spark.readStream
  .option("withEventTimeOrder", "true")
  .table("user_events")
  .withWatermark("event_time", "10 seconds")

참고 항목

또한 클러스터의 Spark 구성을 사용하여 이 설정을 사용하도록 설정할 수도 있습니다. 이 설정은 모든 스트리밍 쿼리에 적용됩니다. spark.databricks.delta.withEventTimeOrder.enabled true

델타 table을 싱크로

구조적 스트리밍을 사용하여 Delta table에 데이터를 쓸 수도 있습니다. 트랜잭션 로그를 사용하면 table에 대해 다른 스트림이나 배치 쿼리가 동시에 실행되는 경우에도 Delta Lake가 정확히 한 번의 처리를 보장합니다.

참고 항목

Delta Lake VACUUM 함수는 Delta Lake에서 관리하지 않는 모든 파일을 제거하되, _로 시작되는 디렉터리는 건너뜁니다. Delta table에 대한 다른 데이터 및 메타데이터와 함께, <table-name>/_checkpoints같은 디렉터리 구조를 사용하여 검사점을 안전하게 저장할 수 있습니다.

메트릭

numBytesOutstanding 메트릭으로 numFilesOutstanding에서 아직 처리되지 않은 바이트 수와 파일 수를 확인할 수 있습니다. 추가 메트릭은 다음과 같습니다.

  • numNewListedFiles: 이 일괄 처리에 대한 백로그를 계산하기 위해 나열된 Delta Lake 파일의 수입니다.
    • backlogEndOffset: 백로그를 계산하는 데 사용되는 table 버전입니다.

Notebook에서 스트림을 실행하는 경우 스트리밍 쿼리 진행률 대시보드의 원시 데이터 탭에서 이러한 메트릭을 볼 수 있습니다.

{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

추가 모드

기본적으로 스트림은 추가 모드로 실행되며, 새 레코드가 table에 추가됩니다.

다음 예제와 같이 tables스트리밍할 때 toTable 메서드를 사용합니다.

Python

(events.writeStream
   .outputMode("append")
   .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
   .toTable("events")
)

Scala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

전체 모드

구조적 스트리밍을 사용하여 매번 모든 배치에서 전체 table를 바꿀 수도 있습니다. 한 가지 사용 사례는 집계를 사용하여 요약을 계산하는 것입니다.

Python

(spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")
)

Scala

spark.readStream
  .table("events")
  .groupBy("customerId")
  .count()
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .toTable("events_by_customer")

앞의 예제는 고객별 이벤트의 집계 수를 포함하는 table을 지속적으로 업데이트합니다.

대기 시간 요구 사항이 더 관대한 애플리케이션의 경우 일회성 트리거로 컴퓨팅 리소스를 절약할 수 있습니다. 이를 사용하여 지정된 일정에 따라 update 요약 집계 tables를 수행하고, 마지막 update이후에 도착한 새 데이터만 처리합니다.

foreachBatch를 사용하여 스트리밍 쿼리에서 upsert

스트리밍 쿼리에서 mergeforeachBatch을 조합하여 Delta table에 복잡한 upsert를 작성할 수 있습니다. foreachBatch를 사용하여 임의의 데이터 싱크에 쓰기를 참조하세요.

이 패턴에는 다음을 포함하여 많은 애플리케이션이 있습니다.

  • Update 모드스트리밍 집계 작성: 전체 모드보다 훨씬 효율적입니다.
  • 델타 데이터베이스 변경 내용 스트림 작성: 변경 데이터 쓰기 위한 병합 쿼리를 사용하여 델타 변경 내용 스트림을 계속 적용할 수 있습니다.
  • 데이터 스트림을 델타 table에 쓰는 중복 제거 작업:자동 중복 제거로 델타 table에 데이터를 지속적으로 쓰기 위해 insert전용 병합 쿼리foreachBatch에서 사용할 수 있습니다.

참고 항목

  • 스트리밍 쿼리가 다시 시작되면 작업이 동일한 데이터 일괄 처리에 여러 차례 적용될 수 있으므로 merge 내부의 foreachBatch 문이 idempotent해야 합니다.
  • mergeforeachBatch에서 사용되는 경우, (StreamingQueryProgress를 통해 보고되고 Notebook 속도 그래프에 표시되는) 스트리밍 쿼리의 입력 데이터 속도는 원본에서 데이터가 생성되는 실제 속도의 배수로 보고될 수 있습니다. 이는 merge가 입력 데이터를 여러 번 읽어서 입력 메트릭이 곱해지기 때문입니다. 이것이 병목 상태가 된다면 merge 앞에서 일괄 처리 DataFrame을 캐시하고 merge 뒤에서 캐시 해제하면 됩니다.

다음 예제에서는 SQL을 foreachBatch 사용하여 이 작업을 수행하는 방법을 보여 줍니다.

Scala

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  // Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  // Use the view name to apply MERGE
  // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  # Set the dataframe to view name
  microBatchOutputDF.createOrReplaceTempView("updates")

  # Use the view name to apply MERGE
  # NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

  # In Databricks Runtime 10.5 and below, you must use the following:
  # microBatchOutputDF._jdf.sparkSession().sql("""
  microBatchOutputDF.sparkSession.sql("""
    MERGE INTO aggregates t
    USING updates s
    ON s.key = t.key
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

다음 예제와 같이 Delta Lake API를 사용하여 스트리밍 upsert를 수행하도록 선택할 수도 있습니다.

Scala

import io.delta.tables.*

val deltaTable = DeltaTable.forName(spark, "table_name")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "table_name")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  (deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key")
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
  )

# Write the output of a streaming aggregation query into Delta table
(streamingAggregatesDF.writeStream
  .foreachBatch(upsertToDelta)
  .outputMode("update")
  .start()
)

foreachBatch에서 Idempotent table 쓰기

참고 항목

Databricks는 update각 싱크에 대해 별도의 스트리밍 쓰기를 구성하는 것이 좋습니다. foreachBatch을 사용하여 여러 tables 쓰기를 직렬화하면 병렬 처리가 감소하고 전체 대기 시간이 증가합니다.

델타 tablesforeachBatch idempotent 내의 여러 tables 쓰기를 위해 다음 DataFrameWriter 옵션을 지원합니다.

  • txnAppId: 각 DataFrame 쓰기에 전달할 수 있는 고유 문자열입니다. 예를 들어 StreamingQuery ID를 txnAppId로 사용할 수 있습니다.
  • txnVersion: 트랜잭션 버전 역할을 하는 단조 증가하는 숫자입니다.

Delta Lake는 txnAppIdtxnVersion 조합을 사용하여 중복 쓰기를 식별하고 무시합니다.

일괄 처리 쓰기가 실패로 중단된 경우 일괄 처리를 다시 실행하면 동일한 애플리케이션 및 일괄 처리 ID를 사용하여 런타임이 중복 쓰기를 올바르게 식별하고 무시하도록 도와줍니다. 애플리케이션 ID(txnAppId)는 사용자 생성 고유 문자열일 수 있으며 스트림 ID와 관련될 필요는 없습니다. foreachBatch를 사용하여 임의의 데이터 싱크에 쓰기를 참조하세요.

Warning

스트리밍 검사점을 삭제하고 새 검사점을 사용하여 쿼리를 다시 시작하는 경우 다른 txnAppId항목을 제공해야 합니다. 새 검사점은 일괄 처리 ID로 시작합니다 0. Delta Lake는 일괄 처리 ID 및 txnAppId 고유 키로 사용하고 이미 표시된 values있는 일괄 처리를 건너뜁니다.

다음 코드 예제에서는 이 패턴을 보여 줍니다.

Python

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2

streamingDF.writeStream.foreachBatch(writeToDeltaLakeTableIdempotent).start()

Scala

val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}