다음을 통해 공유


비동기 진행률 추적이란?

중요하다

이 기능은 공개 미리 보기.

비동기 진행률 추적을 사용하면 구조적 스트리밍 파이프라인이 마이크로 일괄 처리 내의 실제 데이터 처리와 동시에 진행률을 비동기적으로 병렬로 검사할 수 있으므로 offsetLog 유지 관리 및 commitLog관련된 대기 시간을 줄일 수 있습니다.

비동기 진행률 추적

메모

비동기 진행률 추적은 Trigger.once 또는 Trigger.availableNow 트리거에서 작동하지 않습니다. 이러한 트리거를 사용하여 이 기능을 사용하도록 설정하면 쿼리가 실패합니다.

비동기 진행률 추적은 대기 시간을 줄이기 위해 어떻게 작동하나요?

구조적 스트리밍은 오프셋을 유지 및 쿼리 처리에 대한 진행률 지표로 관리하는 데 의존합니다. Offset 관리 작업은 이러한 작업이 완료될 때까지 데이터 처리가 수행되지 않으므로 처리 대기 시간에 직접적인 영향을 줍니다. 비동기 진행률 추적을 사용하면 구조적 스트리밍 파이프라인이 이러한 offset 관리 작업의 영향을 받지 않고 진행률을 검사할 수 있습니다.

검사점 빈도는 언제 구성해야 하나요?

사용자는 진행률이 검사점이 되는 빈도를 구성할 수 있습니다. 검사점 빈도에 대한 기본 설정은 대부분의 쿼리에 적절한 처리량을 제공합니다. 빈도를 구성하는 것은 offset 관리 작업이 처리할 수 있는 것보다 더 높은 속도로 발생하는 시나리오에 유용하며, 이로 인해 offset 관리 작업의 백로그가 계속 증가합니다. 이러한 증가하는 백로그를 막기 위해 데이터 처리가 차단되거나 느려집니다. 기본적으로 처리 동작을 되돌려 비동기 진행률 추적의 이점을 제거합니다.

메모

검사점 간격 시간이 늘어나면 오류 복구 시간이 증가합니다. 오류가 발생할 경우 파이프라인은 이전의 성공적인 검사점 이전에 모든 데이터를 다시 처리해야 합니다. 사용자는 정기적인 처리의 낮은 대기 시간과 오류 발생 시의 복구 시간 사이의 균형을 고려할 수 있습니다.

비동기 진행률 추적과 연결된 구성은 무엇인가요?

선택 기본값 묘사
비동기 진행 추적 활성화 참/거짓 거짓 비동기 진행률 추적 사용 또는 사용 안 함
asyncProgressTrackingCheckpointIntervalMs (비동기 진행 추적 체크포인트 간격 밀리초) 밀리초 1000 오프셋과 완료 커밋을 처리하는 간격

사용자가 비동기 진행률 추적을 사용하도록 설정하려면 어떻게 할까요?

사용자는 아래 코드와 유사한 코드를 사용하여 이 기능을 사용하도록 설정할 수 있습니다.

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

비동기 진행률 추적 끄기

비동기 진행률 추적을 사용하는 경우 프레임워크는 모든 일괄 처리에 대한 진행률을 검사하지 않습니다. 이 문제를 해결하려면 비동기 진행률 추적을 사용하지 않도록 설정하기 전에 다음 설정을 사용하여 두 개 이상의 마이크로 일괄 처리를 처리합니다.

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

두 개 이상의 마이크로 배치 처리가 완료된 후 쿼리를 중지합니다. 이제 비동기 진행률 추적을 안전하게 사용하지 않도록 설정하고 쿼리를 다시 시작할 수 있습니다.

이 단계를 완료하지 않고 비동기 진행률 추적을 사용하지 않도록 설정한 경우 다음 오류가 발생할 수 있습니다.

java.lang.IllegalStateException: batch x doesn't exist

드라이버 로그에 다음 오류가 표시될 수 있습니다.

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

비동기 진행률 추적을 사용하지 않도록 설정하려면 이 섹션의 지침에 따라 이러한 오류를 해결하고 스트리밍 워크로드를 복구할 수 있습니다.

비동기 진행률 추적의 제한 사항

이 기능에는 다음과 같은 제한 사항이 있습니다.

  • 비동기 진행률 추적은 Kafka를 싱크로 사용하는 경우에만 상태 없는 파이프라인에서 지원됩니다.
  • 비동기 진행 추적에서는 실패 시 일괄 처리에 대한 offset 범위가 변경될 수 있기 때문에 정확히 한 번 처리의 종단 간 처리가 보장되지 않습니다. Kafka와 같은 일부 싱크는 정확히 한 번의 보장을 제공하지 않습니다.