상태 저장 스트리밍이란?
상태 저장 구조적 스트리밍 쿼리에는 중간 상태 정보에 대한 증분 업데이트가 필요한 반면 상태 비저장 구조적 스트리밍 쿼리는 원본에서 싱크로 처리된 행에 대한 정보만 추적합니다.
상태 저장 작업에는 스트리밍 집계, 스트리밍 dropDuplicates
, 스트림-스트림 조인, mapGroupsWithState
및 flatMapGroupsWithState
가 포함됩니다.
상태 저장 구조적 스트리밍 쿼리에 필요한 중간 상태 정보는 제대로 구성되지 않은 경우 예기치 않은 대기 시간 및 프로덕션 문제로 이어질 수 있습니다.
Databricks Runtime 13.3 LTS 이상에서는 RocksDB로 변경 로그 체크포인트를 활성화하여 구조화된 스트리밍 워크로드의 체크포인트 기간과 엔드투엔드 지연 시간을 줄일 수 있습니다. Databricks는 모든 구조적 스트리밍 상태 저장 쿼리에 대해 changelog 검사포인트를 사용하도록 설정할 것을 권합니다. 변경 로그 검사점 사용을 참조하세요.
상태 저장 구조적 스트리밍 쿼리 최적화
상태 저장 구조적 스트리밍 쿼리의 중간 상태 정보를 관리하면 예기치 않은 대기 시간 및 프로덕션 문제를 방지할 수 있습니다.
Databricks는 다음을 권장합니다.
- 컴퓨팅 최적화된 인스턴스를 작업자로 사용합니다.
- 순서 섞기 파티션 수를 클러스터 코어 수의 1~2배로 설정합니다.
- SparkSession에서
spark.sql.streaming.noDataMicroBatches.enabled
구성을false
로 설정합니다. 이렇게 하면 스트리밍 마이크로 일괄 처리 엔진이 데이터가 포함되지 않은 마이크로 일괄 처리를 처리할 수 없습니다. 또한 이 구성을false
로 설정하면 워터마크 또는 처리 시간 제한을 활용하는 상태 저장 작업이 데이터 출력을 즉시 가져오는 대신 새 데이터가 도착할 때까지 기다릴 수 있습니다.
Databricks는 상태 저장 스트림의 상태를 관리하기 위해 변경 로그 검사점과 함께 RocksDB를 사용하는 것이 좋습니다. Azure Databricks에서 RocksDB 상태 저장소 구성을 참조하세요.
참고 항목
쿼리를 다시 시작할 때 상태 관리 체계를 변경할 수 없습니다. 즉, 쿼리가 기본 관리로 시작된 경우 새 검사점 위치로 쿼리를 처음부터 시작하지 않는 한, 변경할 수 없습니다.
구조적 스트리밍에서 여러 상태 저장 연산자 작업
Databricks Runtime 13.3 LTS 이상에서 Azure Databricks는 구조적 스트리밍 워크로드의 상태 저장 연산자를 위한 고급 지원을 제공합니다. 이제 여러 상태 저장 연산자를 함께 연결할 수 있습니다. 즉, 창이 있는 집계와 같은 작업의 출력을 조인과 같은 다른 상태 저장 작업에 공급할 수 있습니다.
다음 예제에서는 사용할 수 있는 몇 가지 패턴을 보여 줍니다.
Important
여러 상태 저장 연산자를 사용하는 경우 다음과 같은 제한 사항이 있습니다.
FlatMapGroupWithState
은 지원되지 않습니다.- 추가 출력 모드만 지원됩니다.
연결된 시간 창 집계
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
서로 다른 두 스트림의 시간 창 집계와 스트림 스트림 창 조인
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
스트림-스트림 시간 간격 조인 후 시간 창 집계
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
구조적 스트리밍에 대한 상태 재조정
상태 재조정은 기본적으로 Delta Live Tables의 모든 스트리밍 워크로드에 대해 사용하도록 설정됩니다. Databricks Runtime 11.3 LTS 이상에서는 Spark 클러스터 구성에서 다음 구성 옵션을 설정하여 상태 재조정을 사용하도록 설정할 수 있습니다.
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
상태 리밸런싱은 클러스터 크기 조정 이벤트를 거치는 상태 저장 구조적 스트리밍 파이프라인의 이점을 제공합니다. 상태 비저장 스트리밍 작업은 클러스터 크기 변경에 관계없이 도움이 되지 않습니다.
참고 항목
구조화된 스트리밍 워크로드의 경우 컴퓨팅 자동 크기 조정에는 클러스터 크기를 스케일 다운하는 데 제한이 있습니다. Databricks는 스트리밍 워크로드에 대해 향상된 자동 크기 조정과 함께 Delta Live Tables를 사용하는 것이 좋습니다. 향상된 자동 크기 조정을 사용하여 Delta Live Tables 파이프라인의 클러스터 사용률 최적화를 참조 하세요.
클러스터 크기 조정 이벤트가 발생하면 상태 재조정이 트리거됩니다. 재조정 이벤트 중에는 클라우드 스토리지에서 새 실행기로 상태가 로드되므로 마이크로 배치의 지연 시간이 더 길어질 수 있습니다.
mapGroupsWithState
의 초기 상태 지정
flatMapGroupsWithState
또는 mapGroupsWithState
를 사용하여 구조적 스트리밍 상태 저장 처리에 대한 사용자 정의 초기 상태를 지정할 수 있습니다. 이렇게 하면 유효한 검사점 없이 상태 저장 스트림을 시작할 때 데이터를 다시 처리하지 않도록 할 수 있습니다.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
flatMapGroupsWithState
연산자에 초기 상태를 지정하는 예제 사용 사례:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
연산자에 초기 상태를 지정하는 예제 사용 사례:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
업데이트 기능 테스트
TestGroupState
API를 사용하면 Dataset.groupByKey(...).mapGroupsWithState(...)
및 Dataset.groupByKey(...).flatMapGroupsWithState(...)
에 사용된 상태 업데이트 함수를 테스트할 수 있습니다.
상태 업데이트 함수는 GroupState
형식의 개체를 통해 이전 상태를 입력으로 사용합니다. Apache Spark GroupState 참조 설명서를 참조하세요. 예시:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}