foreachBatch를 사용하여 임의 데이터 싱크에 쓰기
이 문서에서는 구조적 스트리밍을 사용하여 foreachBatch
기존 스트리밍 싱크가 없는 데이터 원본에 스트리밍 쿼리의 출력을 쓰는 방법을 설명합니다.
코드 패턴을 streamingDF.writeStream.foreachBatch(...)
사용하면 스트리밍 쿼리의 모든 마이크로 일괄 처리의 출력 데이터에 일괄 처리 함수를 적용할 수 있습니다.
foreachBatch
에 사용되는 함수에는 두 개의 parameters이 필요합니다.
- 마이크로 일괄 처리의 출력 데이터가 있는 DataFrame입니다.
- 마이크로 일괄 처리의 고유 ID입니다.
구조적 스트리밍에서 Delta Lake 병합 작업에 사용해야 foreachBatch
합니다. foreachBatch를 사용하여 스트리밍 쿼리에서 Upsert를 참조하세요.
추가 DataFrame 작업 적용
Spark는 이러한 경우 증분 계획 생성을 지원하지 않으므로 많은 DataFrame 및 데이터 세트 작업이 스트리밍 DataFrame에서 지원되지 않습니다.
foreachBatch()
를 사용하면 각 마이크로 일괄 처리 출력에 이러한 작업 중 일부를 적용할 수 있습니다. 예를 들어, foreachBatch()
및 SQL MERGE INTO
작업을 사용하여 Delta table를 update 모드에서 스트리밍 집계의 출력으로 쓸 수 있습니다. 자세한 내용은 MERGE INTO에서 확인하세요.
Important
-
foreachBatch()
는 한 번 이상의 쓰기 보장만 제공합니다. 함수에 제공된batchId
를 사용하여 출력을 중복 제거하고 get 정확히 한 번만 실행할 수 있도록 보장할 수 있습니다. 두 경우 모두 엔드투엔드 의미 체계에 대해 직접 추론해야 합니다. -
foreachBatch()
는 기본적으로 스트리밍 쿼리의 마이크로 일괄 처리 실행에 의존하기 때문에 연속 처리 모드에서 작동하지 않습니다. 연속 모드로 데이터를 작성하는 경우 대신foreach()
를 사용합니다.
foreachBatch()
를 사용하여 빈 데이터 프레임을 호출할 수 있으며 사용자 코드는 적절한 작업을 허용하기 위해 복원력이 있어야 합니다. 예는 다음과 같습니다.
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0의 동작 변경 내용 foreachBatch
공유 액세스 모드로 구성된 컴퓨팅의 Databricks Runtime 14.0 이상에서는 다음과 같은 동작 변경이 적용됩니다.
-
print()
명령은 드라이버 로그에 출력을 씁니다. - 함수 내의
dbutils.widgets
하위 코드에 액세스할 수 없습니다. - 함수에서 참조되는 모든 파일, 모듈 또는 개체는 직렬화 가능하며 Spark에서 사용할 수 있어야 합니다.
기존 일괄 처리 데이터 원본 다시 사용
를 사용하면 foreachBatch()
구조적 스트리밍이 지원되지 않을 수 있는 데이터 싱크에 기존 일괄 처리 데이터 기록기를 사용할 수 있습니다. 다음은 몇 가지 예입니다.
다른 많은 일괄 처리 데이터 원본은 .에서 foreachBatch()
사용할 수 있습니다. 데이터 원본에 대한 연결을 참조 하세요.
여러 위치에 쓰기
스트리밍 쿼리의 출력을 여러 위치에 작성해야 하는 경우 Databricks는 최상의 병렬화 및 처리량을 위해 여러 구조적 스트리밍 기록기를 사용하는 것이 좋습니다.
여러 싱크에 쓰기를 사용하면 foreachBatch
스트리밍 쓰기의 실행이 직렬화되어 각 마이크로 일괄 처리에 대한 대기 시간이 증가할 수 있습니다.
여러 Delta