구조적 스트리밍 검사점
검사점 및 미리 쓰기 로그는 함께 작동하여 구조적 스트리밍 워크로드에 대한 처리 보장을 제공합니다. 검사점은 상태 정보 및 처리된 레코드를 포함하여 쿼리를 식별하는 정보를 추적합니다. 검사점 디렉터리에서 파일을 삭제하거나 새 검사점 위치로 변경하면 쿼리의 다음 실행이 새로 시작됩니다.
각 쿼리에는 다른 검사점 위치가 있어야 합니다. 여러 쿼리가 같은 위치를 공유해서는 안 됩니다.
구조적 스트리밍 쿼리에 검사점 설정 사용
다음 예제와 같이 스트리밍 쿼리를 실행하기 전에 checkpointLocation
옵션을 지정해야 합니다.
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
참고 항목
Notebook의 display()
용 출력 및 memory
싱크와 같은 일부 싱크는 이 옵션을 생략하면 임시 체크포인트 위치를 자동으로 생성합니다. 이러한 임시 검사점 위치는 내결함성 또는 데이터 일관성을 보장하지 않으며, 제대로 정리되지 않을 수 있습니다. Databricks는 항상 이러한 싱크에 대한 검사점 위치를 지정하는 것이 좋습니다.
구조적 스트리밍 쿼리 변경 후 복구
동일한 검사점 위치에서 다시 시작할 때 허용되는 스트리밍 쿼리 변경에는 제한이 있습니다. 다음은 허용되지 않거나 변경 효과가 명확하게 정의되지 않은 몇 가지 변경 사항입니다. 다음은 모든 변경에 적용됩니다.
- 허용된다는 용어는 지정된 변경을 수행할 수 있음을 의미하지만 변경 영향의 의미 체계가 잘 정의되었는지 여부는 쿼리와 변경 내용에 따라 다릅니다.
- 허용되지 않는다는 용어는 다시 시작된 쿼리가 예기치 않은 오류로 인해 실패할 가능성이 높으므로 지정된 변경을 수행하지 않아야 함을 의미합니다.
sdf
는sparkSession.readStream
으로 생성된 스트리밍 DataFrame/데이터 세트를 나타냅니다.
구조적 스트리밍 쿼리의 변경 유형
- 입력 원본의 개수 또는 형식(즉, 다른 원본) 변경: 허용되지 않습니다.
- 입력 원본의 매개 변수 변경: 허용 여부 및 변경의 의미 체계가 잘 정의되었는지 여부는 원본과 쿼리에 따라 다릅니다. 다음은 몇 가지 예입니다.
속도 제한 추가, 삭제, 수정은 허용됩니다. 예:
spark.readStream.format("kafka").option("subscribe", "article")
끝
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
구독된 문서 및 파일 변경은 결과를 예측할 수 없기 때문에 일반적으로 허용되지 않습니다. 예:
spark.readStream.format("kafka").option("subscribe", "article")
->spark.readStream.format("kafka").option("subscribe", "newarticle")
- 트리거 간격 변경: 증분 배치와 시간 간격 사이에서 트리거를 변경할 수 있습니다. 실행 간의 트리거 간격 변경을 참조하세요.
- 출력 싱크 형식 변경: 몇 가지 특정 싱크 조합 간 변경은 허용됩니다. 사례별로 확인해야 합니다. 다음은 몇 가지 예입니다.
- 파일 싱크 -> Kafka 싱크 변경은 허용됩니다. Kafka에는 새 데이터만 표시됩니다.
- Kafka 싱크 -> 파일 싱크 변경은 허용되지 않습니다.
- Kafka 싱크 -> foreach 변경 또는 그 반대로 변경은 허용됩니다.
- 출력 싱크의 매개 변수 변경: 허용 여부 및 변경의 의미 체계가 잘 정의되었는지 여부는 싱크와 쿼리에 따라 다릅니다. 다음은 몇 가지 예입니다.
- 파일 싱크의 출력 디렉터리 변경은 허용되지 않습니다. 예:
sdf.writeStream.format("parquet").option("path", "/somePath")
->sdf.writeStream.format("parquet").option("path", "/anotherPath")
- 출력 주제 변경이 허용됩니다:
sdf.writeStream.format("kafka").option("topic", "topic1")
에서sdf.writeStream.format("kafka").option("topic", "topic2")
로 - 사용자 정의 foreach 싱크(즉,
ForeachWriter
코드) 변경은 허용되지만 변경 의미 체계는 코드에 따라 다릅니다.
- 파일 싱크의 출력 디렉터리 변경은 허용되지 않습니다. 예:
- 프로젝션/필터/맵 유사 작업 변경: 일부 사례는 허용됩니다. 예:
- 필터 추가/삭제는 허용됩니다. 예:
sdf.selectExpr("a")
->sdf.where(...).selectExpr("a").filter(...)
- 동일한 출력 스키마를 사용하는 프로젝션 변경은 허용됩니다. 예:
sdf.selectExpr("stringColumn AS json").writeStream
->sdf.select(to_json(...).as("json")).writeStream
- 다른 출력 스키마를 사용하는 프로젝션 변경은 조건부로 허용됩니다. 예:
sdf.selectExpr("a").writeStream
->sdf.selectExpr("b").writeStream
변경은 출력 싱크에서"a"
->"b"
스키마 변경을 허용하는 경우에만 허용됩니다.
- 필터 추가/삭제는 허용됩니다. 예:
- 상태 저장 작업 변경: 스트리밍 쿼리의 일부 작업은 결과를 지속적으로 업데이트하기 위해 상태 데이터를 유지 관리해야 합니다. 구조적 스트리밍은 상태 데이터 검사점을 내결함성 스토리지(예: DBFS, Azure Blob Storage)에 자동으로 설정하고, 다시 시작된 후 복원합니다. 그러나 상태 데이터의 스키마는 다시 시작된 후에도 동일하게 유지된다고 가정합니다. 즉, 스트리밍 쿼리의 상태 저장소 작업에 대한 변경(즉, 추가, 삭제 또는 스키마 수정)은 재시작 사이에 허용되지 않습니다.라는 의미입니다. 다음은 상태 복구를 보장하기 위해 다시 시작할 때 스키마가 변경되면 안 되는 상태 저장 작업 목록입니다.
- 스트리밍 집계: 예를 들면
sdf.groupBy("a").agg(...)
입니다. 그룹화 키 또는 집계의 개수 또는 형식 변경은 허용되지 않습니다. - 스트리밍 중복 제거: 예를 들면
sdf.dropDuplicates("a")
입니다. 그룹화 키 또는 집계의 개수 또는 형식 변경은 허용되지 않습니다. - 스트림-스트림 조인: 예를 들면
sdf1.join(sdf2, ...)
입니다(즉, 두 입력이 모두sparkSession.readStream
으로 생성됨). 스키마 또는 동등 조인 열 변경은 허용되지 않습니다. 조인 유형(외부 또는 내부) 변경은 허용되지 않습니다. 기타 조인 조건 변경은 잘 정의되어 있지 않습니다. - 임의 상태 저장 작업: 예를 들면
sdf.groupByKey(...).mapGroupsWithState(...)
또는sdf.groupByKey(...).flatMapGroupsWithState(...)
입니다. 사용자 정의 상태 스키마 및 시간 제한 유형 변경은 허용되지 않습니다. 사용자 정의 상태 매핑 함수 내의 변경은 허용되지만 변경 영향의 의미 체계는 사용자 정의 논리에 따라 다릅니다. 상태 스키마 변경을 지원하려는 경우 스키마 마이그레이션을 지원하는 인코딩/디코딩 체계를 사용하여 복잡한 상태 데이터 구조를 바이트로 명시적으로 인코드/디코드할 수 있습니다. 예를 들어 상태를 Avro 인코딩된 바이트로 저장하는 경우 이진 상태를 복원하므로 쿼리를 다시 시작할 때 Avro 상태 스키마를 변경할 수 있습니다.
- 스트리밍 집계: 예를 들면