다음을 통해 공유


워터마크를 사용하여 Delta Live Tables에서 상태 저장 처리 최적화

상태를 유지하는 데이터를 효과적으로 관리하려면 집계, 조인 및 중복 제거를 포함하여 Delta Live Tables에서 상태 저장 스트림 처리를 수행할 때 워터마크를 사용합니다. 이 문서에서는 Delta Live Tables 쿼리에서 워터마크를 사용하는 방법을 설명하고 권장 작업의 예를 포함합니다.

참고 항목

집계를 수행하는 쿼리가 증분 방식으로 처리되고 각 업데이트에서 완전히 다시 계산되지 않도록 하려면 워터마크를 사용해야 합니다.

워터마크란?

스트림 처리 에서 워터마크는 집계와 같은 상태 저장 작업을 수행할 때 데이터를 처리하기 위한 시간 기반 임계값을 정의할 수 있는 Apache Spark 기능입니다. 데이터 도착은 임계값에 도달할 때까지 처리되며, 이때 임계값에 정의된 기간은 닫힙니다. 워터마크는 주로 더 큰 데이터 세트 또는 장기 실행 처리를 처리할 때 쿼리 처리 중에 문제를 방지하는 데 사용할 수 있습니다. 이러한 문제에는 결과 생성 시 대기 시간이 높고 처리 중 상태로 유지되는 데이터의 양 때문에 OOM(메모리 부족) 오류도 포함될 수 있습니다. 스트리밍 데이터는 기본적으로 순서가 지정되지 않으므로 워터마크는 시간 범위 집계와 같은 올바르게 계산 작업을 지원합니다.

스트림 처리 에서 워터마크를 사용하는 방법에 대한 자세한 내용은 Apache Spark 구조적 스트리밍 의 워터마킹 및 워터마크 적용을 참조하여 데이터 처리 임계값을 제어합니다.

워터마크를 정의하려면 어떻게 해야 할까요?

타임스탬프 필드와 지연 데이터가 도착하는 시간 임계값을 나타내는 값을 지정하여 워터마크를 정의합니다. 데이터는 정의된 시간 임계값 이후에 도착하는 경우 늦게 간주됩니다. 예를 들어 임계값이 10분으로 정의된 경우 10분 임계값 이후에 도착하는 레코드가 삭제될 수 있습니다.

정의된 임계값 이후에 도착하는 레코드는 삭제될 수 있으므로 대기 시간 및 정확성 요구 사항을 충족하는 임계값을 선택하는 것이 중요합니다. 임계값을 더 작게 선택하면 레코드가 더 빨리 내보내지지만 늦은 레코드가 삭제될 가능성이 더 높다는 의미이기도 합니다. 임계값이 클수록 대기 시간이 길어지지만 데이터의 완전성이 높아질 수 있습니다. 상태 크기가 커지므로 임계값이 클수록 추가 컴퓨팅 리소스가 필요할 수도 있습니다. 임계값은 데이터 및 처리 요구 사항에 따라 달라지므로 최적의 임계값을 결정하는 데 처리 테스트 및 모니터링이 중요합니다.

Python의 함수를 withWatermark() 사용하여 워터마크를 정의합니다. SQL에서 절을 WATERMARK 사용하여 워터마크를 정의합니다.

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

스트림 스트림 조인과 함께 워터마크 사용

스트림 스트림 조인의 경우 조인의 양쪽에 워터마크 및 시간 간격 절을 정의해야 합니다. 각 조인 원본에는 데이터의 불완전한 보기가 있으므로 더 이상 일치를 만들 수 없는 경우 스트리밍 엔진에 시간 간격 절이 필요합니다. 시간 간격 절은 워터마크를 정의하는 데 사용되는 것과 동일한 필드를 사용해야 합니다.

각 스트림에 워터마크에 대해 서로 다른 임계값이 필요한 경우가 있을 수 있으므로 스트림에 동일한 임계값이 필요하지 않습니다. 데이터 누락을 방지하기 위해 스트리밍 엔진은 가장 느린 스트림에 따라 하나의 전역 워터마크를 유지 관리합니다.

다음 예에서는 광고 노출 스트림과 광고 클릭 스트림을 조인합니다. 이 예제에서는 노출 후 3분 이내에 클릭이 발생해야 합니다. 3분 간격이 지나면 더 이상 일치시킬 수 없는 상태의 행이 삭제됩니다.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

워터마크를 사용하여 창이 있는 집계 수행

스트리밍 데이터에 대한 일반적인 상태 저장 작업은 창이 있는 집계입니다. 창 집계는 그룹화된 집계와 유사합니다. 단, 집계 값은 정의된 창의 일부인 행 집합에 대해 반환됩니다.

창은 특정 길이로 정의할 수 있으며 해당 창의 일부인 모든 행에서 집계 작업을 수행할 수 있습니다. Spark Streaming은 다음 세 가지 유형의 창을 지원합니다.

  • 연속(고정) 창: 고정 크기, 겹치지 않는 일련의 연속 시간 간격입니다. 입력 레코드는 단일 창에만 속합니다.
  • 슬라이딩 윈도우: 연속 창과 비슷하게 슬라이딩 윈도우는 고정 크기이지만 창이 겹칠 수 있으며 레코드가 여러 창으로 떨어질 수 있습니다.

데이터가 창의 끝과 워터마크의 길이를 지나 도착하면 창에 대해 새 데이터가 허용되지 않고 집계 결과가 내보내지고 창의 상태가 삭제됩니다.

다음 예제에서는 고정 창을 사용하여 5분마다 노출의 합계를 계산합니다. 이 예제에서 select 절은 별칭을 impressions_window사용한 다음 창 자체가 절의 GROUP BY 일부로 정의됩니다. 창은 이 예제의 열인 워터마크와 clickTimestamp 동일한 타임스탬프 열을 기반으로 해야 합니다.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

시간당 고정 기간 동안 수익을 계산하는 Python의 유사한 예:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

스트리밍 레코드 중복 제거

구조적 스트리밍은 정확히 한 번만 처리할 수 있지만 데이터 원본에서 레코드를 자동으로 중복 제거하지는 않습니다. 예를 들어 많은 메시지 큐에 한 번 이상의 보장이 있으므로 이러한 메시지 큐 중 하나에서 읽을 때 중복 레코드가 필요합니다. 함수를 dropDuplicatesWithinWatermark() 사용하여 지정된 필드에 대한 레코드를 중복 제거하여 일부 필드가 다른 경우에도(예: 이벤트 시간 또는 도착 시간) 스트림에서 중복 항목을 제거할 수 있습니다. 함수를 사용 dropDuplicatesWithinWatermark() 하려면 워터마크를 지정해야 합니다. 워터마크에 지정된 시간 범위 내에 도착하는 모든 중복 데이터는 삭제됩니다.

순서가 잘못된 데이터로 인해 워터마크 값이 잘못 진행되므로 정렬된 데이터가 중요합니다. 그런 다음 이전 데이터가 도착하면 늦게 삭제된 것으로 간주됩니다. withEventTimeOrder 워터마크에 지정된 타임스탬프에 따라 초기 스냅샷을 순서대로 처리하려면 이 옵션을 사용합니다. 이 withEventTimeOrder 옵션은 데이터 세트를 정의하는 코드 또는 파이프라인 설정에서 선언할 수 있습니다spark.databricks.delta.withEventTimeOrder.enabled. 예시:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

참고 항목

withEventTimeOrder 옵션은 Python에서만 지원됩니다.

다음 예제에서는 데이터가 순서에 따라 clickTimestamp처리되고 중복 userId clickAdId 및 열이 포함된 서로 5초 이내에 도착하는 레코드가 삭제됩니다.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

상태 저장 처리를 위한 파이프라인 구성 최적화

프로덕션 문제와 과도한 대기 시간을 방지하기 위해 Databricks는 상태 저장 스트림 처리에 대해 RocksDB 기반 상태 관리를 사용하도록 설정하는 것이 좋습니다. 특히 처리 시 많은 양의 중간 상태를 저장해야 하는 경우

세버리스 파이프라인은 상태 저장소 구성을 자동으로 관리합니다.

파이프라인을 배포하기 전에 다음 구성을 설정하여 RocksDB 기반 상태 관리를 사용하도록 설정할 수 있습니다.

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB에 대한 구성 권장 사항을 포함하여 RocksDB 상태 저장소에 대한 자세한 내용은 Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.