워터마크를 적용하여 데이터 처리 임계값 제어
이 문서에서는 워터마크의 기본 개념을 소개하고 일반적인 상태 저장 스트리밍 작업에서 워터마크 사용에 관한 권장 사항을 제공합니다. 메모리 문제를 야기시켜 장기 실행 스트리밍 작업 중에 처리 대기 시간이 증가시킬 수 있으므로 상태 저장 스트리밍 작업에 워터마크를 적용하여 상태로 유지되는 데이터의 양이 무한히 확장하지 않게 해야 합니다.
watermark이란 무엇입니까?
구조적 스트리밍은 워터마크를 사용하여 지정된 상태 엔터티에 대해 업데이트를 계속 처리하는 기간의 임계값을 제어합니다. 상태 엔터티의 일반적인 예는 다음과 같습니다.
- 시간의 흐름에 따른 집계 window.
- 두 스트림 사이 join의 고유 키입니다.
watermark을 선언할 때, 스트리밍 DataFrame에 타임스탬프 필드와 watermark 임계값을 지정합니다. 새 데이터가 도착하면 상태 관리자는 지정된 필드에서 가장 최근 타임스탬프를 추적하고 대기 시간 임계값 내의 모든 레코드를 처리합니다.
다음 예제에서는 창 수에 10분 watermark 임계값을 적용합니다.
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
이 예에서는 다음이 적용됩니다.
-
event_time
column는 10분 watermark와 5분 구르기 window를 정의하는 데 사용됩니다. - 겹치지 않는 각 5분 창에 대해 관찰된 각
id
창에 대해 개수가 수집됩니다. - 각 카운트에 대한 상태 정보는 window가 관찰된 최신
event_time
보다 10분 더 오래될 때까지 유지됩니다.
Important
Watermark 임계값은 지정된 임계값 내에 도착하는 레코드가 정의된 쿼리의 의미 체계에 따라 처리되도록 보장합니다. 지정된 임계값을 벗어나는 지연 도착 레코드는 여전히 쿼리 메트릭을 사용하여 처리될 수 있지만 이것이 보장되지는 않습니다.
워터마크는 처리 시간과 처리량에 어떻게 영향을 주나요?
워터마크는 출력 모드와 상호 작용하여 데이터가 싱크에 기록되는 시점을 제어합니다. 워터마크는 처리할 상태 정보의 총 양을 줄이기 때문에 효율적인 상태 저장 스트리밍 처리량을 위해 워터마크를 효과적으로 사용하는 것이 중요합니다.
참고 항목
모든 상태 저장 작업에 대해 모든 출력 모드가 지원되는 것은 아닙니다.
창 집계에 대한 워터마크 및 출력 모드
다음 tablewatermark 정의된 타임스탬프에서 집계가 있는 쿼리에 대한 처리를 자세히 설명합니다.
출력 모드 | 동작 |
---|---|
Append | watermark 임계값이 초과되면 행이 대상 table에 기록됩니다. 모든 쓰기는 대기 시간 임계값에 따라 지연됩니다. 임계값을 통과하면 이전 집계 상태가 삭제됩니다. |
Update | 행은 결과가 계산될 때 대상 table에 기록되며, 새 데이터가 도착하면 업데이트되거나 덮어쓸 수 있습니다. 임계값을 통과하면 이전 집계 상태가 삭제됩니다. |
Complete | 집계 상태가 삭제되지 않습니다. 트리거가 발생할 때마다 대상 table이(가) 다시 작성됩니다. |
스트림-스트림 조인에 대한 워터마크 및 출력
여러 스트림 간의 조인은 추가 모드만 지원하며 일치하는 레코드는 검색된 각 일괄 처리로 기록됩니다. 내부 조인의 경우 Databricks는 각 스트리밍 데이터 원본에 watermark 임계값을 설정하는 것이 좋습니다. 이렇게 하면 이전 레코드의 상태 정보를 버릴 수 있습니다. 워터마크가 없을 때, 구조적 스트리밍은 각 트리거로 join 양쪽의 모든 키에 대해 join을 시도합니다.
구조적 스트리밍에는 외부 조인을 지원하는 특별한 의미 체계가 있습니다. 일치하지 않게 된 후 키를 null 값으로 작성해야 하는 시점을 나타내기 때문에 외부 조인에는 워터마크가 필수입니다. 외부 조인은 데이터 처리 중에 일치하지 않는 레코드를 기록하는 데 유용할 수 있습니다. 그러나 조인이 추가 작업으로 tables에 쓰기만 하기 때문에 지연 임계값이 지나야 이 누락된 데이터가 기록됩니다.
구조적 스트리밍에서 여러 watermark 정책을 사용하여 지연 데이터 임계값 제어
여러 구조적 스트리밍 입력을 사용하는 경우 여러 워터마크를 set 지연 도착 데이터에 대한 허용 오차 임계값을 제어할 수 있습니다. 워터마크를 구성하면 상태 정보를 제어하고 대기 시간에 영향을 미칠 수 있습니다.
스트리밍 쿼리에 여러 입력 스트림이 통합되거나 함께 조인되어 있을 수 있습니다. 입력 스트림마다 상태 저장 작업에 허용되어야 하는 지연 데이터의 임계값이 다를 수 있습니다. 각 입력 스트림에서 withWatermarks("eventTime", delay)
를 사용하여 이러한 임계값을 지정합니다. 다음은 스트림-스트림 조인을 사용하는 예제 쿼리입니다.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
쿼리를 실행하는 동안 구조적 스트리밍은 각 입력 스트림에서 관찰되는 최대 이벤트 시간을 개별적으로 추적하고, 해당 지연에 따라 워터마크를 계산하며, 이러한 정보를 바탕으로 상태 저장 작업에 사용할 단일 전역 watermark을 선택합니다. 기본적으로, 하나의 스트림이 다른 스트림보다 뒤처질 경우에도 데이터가 실수로 삭제되는 일이 없도록 전역 watermark이 최소값으로 선택됩니다. (예: 하나의 스트림이 업스트림 실패로 인해 데이터 수신을 중단하는 경우) 즉, 전역 watermark는 가장 느린 스트림의 속도에 맞춰 안전하게 이동하며, 그에 따라 쿼리 출력이 지연됩니다.
더 빠른 결과를 get하고 싶다면, SQL 구성 spark.sql.streaming.multipleWatermarkPolicy
을 max
로 설정하여 여러 watermark 정책을 set함으로써 전역 watermark의 최대값을 선택할 수 있습니다(기본값은 min
). 이렇게 하면 전역 watermark가 가장 빠른 스트림의 속도로 이동할 수 있습니다. 그러나 이 구성은 가장 느린 스트림에서 데이터를 삭제합니다. 따라서 이 구성은 신중하게 사용하는 것이 좋습니다.
watermark 내에서 중복 항목 삭제
Databricks Runtime 13.3 LTS 이상에서는 고유한 identifier사용하여 watermark 임계값 내에서 레코드를 중복 제거할 수 있습니다.
구조적 스트리밍은 정확히 한 번 처리 보장을 제공하지만 데이터 원본에서 레코드를 자동으로 중복 제거하지는 않습니다.
dropDuplicatesWithinWatermark
을 사용하여 지정된 필드의 레코드를 중복 제거할 수 있으며, 일부 필드가 다르더라도(예: 이벤트 시간 또는 도착 시간) 스트림에서 중복된 항목을 remove 수 있습니다.
지정된 watermark 내에 도착하는 중복 레코드는 삭제됩니다. 이 보장은 한 방향으로만 엄격하며 지정된 임계값을 벗어나는 중복 레코드도 삭제될 수 있습니다. 모든 중복 항목을 remove 하기 위해 중복된 이벤트 간의 최대 타임스탬프 차이보다 긴 watermark 지연 임계값을 set 해야 합니다.
dropDuplicatesWithinWatermark
메서드를 사용하려면 watermark을 지정해야 합니다. 다음 예제를 참조하십시오.
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])