Azure Cosmos DB의 변경 피드 프로세서
적용 대상: NoSQL
변경 피드 프로세서는 Azure Cosmos DB .NET V3 및 Java V4 SDK의 일부입니다. 이를 통해 변경 피드를 읽고 이벤트 처리를 여러 소비자에게 효과적으로 배포하는 프로세스를 간소화할 수 있습니다.
변경 피드 프로세서를 사용할 때 얻을 수 있는 주요 혜택은 변경 피드의 모든 이벤트가 "적어도 한 번"은 전달되도록 보장하는 내결함성 설계입니다.
지원되는 SDK
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
변경 피드 프로세서의 구성 요소
변경 피드 프로세서는 다음과 같은 4가지 기본 구성 요소로 이루어져 있습니다.
모니터링된 컨테이너: 모니터링된 컨테이너에는 변경 피드가 생성되는 데이터가 있습니다. 모니터링된 컨테이너에 대한 모든 삽입 및 업데이트는 컨테이너의 변경 피드에 반영됩니다.
임대 컨테이너: 임대 컨테이너는 상태 스토리지 역할을 하며 여러 작업자의 변경 피드 처리를 조정합니다. 임대 컨테이너는 모니터링되는 컨테이너와 동일한 계정 또는 별도의 계정에 저장할 수 있습니다.
컴퓨팅 인스턴스: 컴퓨팅 인스턴스는 변경 내용을 수신 대기하도록 변경 피드 프로세서를 호스트합니다. 플랫폼에 따라 VM(가상 머신), kubernetes Pod, Azure App Service 인스턴스, 실제 물리적 머신으로 표현할 수 있습니다. 컴퓨팅 인스턴스는 이 문서에서 인스턴스 이름이라고 부르는 고유 식별자를 갖고 있습니다.
대리자: 대리자는 개발자가 변경 피드 프로세서에서 읽는 각 변경 내용 일괄 처리로 수행하려는 작업을 정의하는 코드입니다.
변경 피드 프로세서의 이러한 4가지 요소가 상호 작용하는 방식을 좀 더 자세히 알아보기 위해 다음 다이어그램의 예제를 살펴보겠습니다. 모니터링되는 컨테이너는 문서를 저장하며, "City"를 파티션 키로 사용합니다. 파티션 키 값은 항목을 포함하는 범위(각 범위는 물리적 파티션을 나타냄)에 분산됩니다.
이 다이어그램은 컴퓨팅 인스턴스 2개를 보여주며, 변경 피드 프로세서는 컴퓨팅 분산을 최대화하기 위해 각 인스턴스에 서로 다른 범위를 할당합니다. 각 인스턴스는 고유한 이름을 갖습니다.
각 범위를 병렬로 읽습니다. 범위의 진행률은 임대 문서를 통해 임대 컨테이너의 다른 범위와 별도로 유지 관리됩니다. 임대 조합은 변경 피드 프로세서의 현재 상태를 나타냅니다.
변경 피드 프로세서 구현
.NET의 변경 피드 프로세서는 최신 버전 모드 및 모든 버전 및 삭제 모드에서 사용할 수있습니다. 모든 버전 및 삭제 모드는 미리 보기로 제공되며 버전 3.40.0-preview.0
부터 변경 피드 프로세서에 대해 지원됩니다. 두 모드의 진입점은 언제나 모니터링되는 컨테이너입니다.
최신 버전 모드를 사용하여 읽으려면 Container
인스턴스에서 GetChangeFeedProcessorBuilder
를 호출합니다.
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
모든 버전 및 삭제 모드를 사용하여 읽으려면 Container
인스턴스에서 GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
를 호출합니다.
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
두 모드에서 첫 번째 매개 변수는 이 프로세서의 목표를 설명하는 고유한 이름입니다. 두 번째 이름은 변경 내용을 처리하는 대리자 구현입니다.
다음은 최신 버전 모드의 대리자 예시입니다.
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
다음은 모든 버전 및 삭제 모드의 대리자 예시입니다.
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
그 후 WithInstanceName
을 사용하여 컴퓨팅 인스턴스 이름 또는 고유 식별자를 정의합니다. 컴퓨팅 인스턴스 이름은 배포하는 컴퓨팅 인스턴스마다 고유하고 달라야 합니다. WithLeaseContainer
를 사용하여 임대 상태를 유지하도록 컨테이너를 설정합니다.
Build
를 호출하면 StartAsync
를 호출하여 시작할 수 있는 프로세서 인스턴스가 제공됩니다.
참고 항목
위의 코드 조각은 GitHub의 샘플에서 가져온 것입니다. 최신 버전 모드 또는 모든 버전 및 삭제 모드에 대한 샘플을 가져올 수 있습니다.
처리 수명 주기
호스트 인스턴스의 일반적인 수명 주기는 다음과 같습니다.
- 변경 피드를 읽습니다.
- 변경 내용이 없는 경우에는 미리 정의된 시간(빌더에서
WithPollInterval
을 사용하여 사용자 지정 가능) 동안 일시 중지한 후 #1로 이동합니다. - 변경 내용이 있으면 대리자에게 보냅니다.
- 성공적으로 변경 내용 처리를 완료하면 최신 처리 시점으로 임대 상점을 업데이트하고 #1로 이동합니다.
오류 처리
변경 피드 프로세서는 사용자 코드 오류에 대한 복원력이 있습니다. 대리자 구현에 처리되지 않은 예외가 있는 경우(#4단계) 해당 변경 내용의 일괄 처리를 처리하는 스레드가 중지되고, 결국에는 새 스레드가 만들어집니다. 새 스레드는 임대 저장소가 해당 파티션 키 값 범위에 대해 저장한 최신 시점을 확인합니다. 여기에서 새 스레드가 다시 시작되고, 동일한 변경 내용 일괄 처리를 대리자에게 효과적으로 보냅니다. 이 동작은 대리인이 변경 내용을 올바르게 처리할 때까지 지속되며, 변경 피드 프로세서가 "적어도 한 번" 보장하는 이유가 바로 여기에 있습니다.
참고 항목
오직 한 가지 시나리오에서만 변경 내용 일괄 처리가 다시 시도되지 않습니다. 첫 번째 대리자 실행에서 오류가 발생하면 임대 저장소에서 이전에 저장된 상태를 다시 시도에 사용할 수 없습니다. 이러한 경우 다시 시도에는 마지막 일괄 처리를 포함할 수도 있고 포함하지 않을 수도 있는 초기 시작 구성이 사용됩니다.
변경 피드 프로세서가 동일한 변경 내용의 일괄 처리를 계속해서 다시 시도하는 것에 “고착”되지 않도록 하려면 예외 발생 시 오류 있는 메시지 큐에 문서를 쓰도록 처리 함수에 논리를 추가해야 합니다. 이 디자인은 계속해서 나중에 변경 내용을 처리할 수 있는 동안 처리되지 않은 변경 내용을 추적할 수 있도록 합니다. 오류 있는 메시지 큐는 다른 Azure Cosmos DB 컨테이너일 수 있습니다. 정확한 데이터 저장소는 중요하지 않습니다. 처리되지 않은 변경 내용을 유지하기만 하면 됩니다.
또한 변경 피드 예측 도구를 사용하여 변경 피드 프로세서 인스턴스가 변경 피드를 읽을 때 진행률을 모니터링하거나, 수명 주기 알림을 사용하여 근본적인 오류를 감지할 수 있습니다.
수명 주기 알림
변경 피드 프로세서를 수명 주기의 관련 이벤트에 연결할 수 있습니다. 한 이벤트 또는 모든 이벤트에 대한 알림을 받도록 선택할 수 있습니다. 적어도 오류 알림은 등록하는 것이 좋습니다.
- 현재 호스트가 임대를 획득하여 처리를 시작할 때 알림을 받도록
WithLeaseAcquireNotification
의 처리기를 등록합니다. - 현재 호스트가 임대를 해제하고 처리를 중지할 때 알림을 받도록
WithLeaseReleaseNotification
의 처리기를 등록합니다. - 처리 중에 현재 호스트에서 예외가 발생하면
WithErrorNotification
알림을 받을 처리기를 등록합니다. 출처가 사용자 대리자(처리되지 않은 예외)인지 아니면 프로세서가 모니터링되는 컨테이너에 액세스하려고 할 때 발생하는 오류(예: 네트워킹 문제)인지 구분할 수 있어야 합니다.
수명 주기 알림은 두 변경 피드 모드에서 모두 사용할 수 있습니다. 다음은 최신 버전 모드의 수명 주기 알림 예시입니다.
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
배포 단위
단일 변경 피드 프로세서 배포 단위는 processorName
값이 동일하고 임대 컨테이너 구성이 동일하지만 인스턴스 이름은 서로 다른 하나 이상의 컴퓨팅 인스턴스로 구성됩니다. 각 단위의 변경 내용에 대한 비즈니스 흐름이 서로 다른 여러 배포 단위를 사용할 수 있으며 각 배포 단위는 하나 이상의 인스턴스로 구성됩니다.
예를 들어 컨테이너에 변경 내용이 있을 때마다 외부 API를 트리거하는 배포 단위가 하나 있다고 가정하겠습니다. 또 다른 배포 단위는 변경 내용이 있을 때마다 데이터를 실시간으로 이동해야 합니다. 모니터링되는 컨테이너에서 변경이 발생하면 모든 배포 단위에 알림이 전달됩니다.
동적 스케일링
앞서 언급했듯이, 배포 단위 내에서 하나 이상의 컴퓨팅 인스턴스를 사용할 수 있습니다. 배포 단위 내에서 컴퓨팅 분산을 활용하려면 충족해야 하는 유일한 핵심 요구 사항은 다음과 같습니다.
- 모든 인스턴스에 동일한 임대 컨테이너 구성이 있어야 합니다.
- 모든 인스턴스의
processorName
값이 동일해야 합니다. - 각 인스턴스에는 다른 인스턴스 이름(
WithInstanceName
)이 있어야 합니다.
이러한 세 조건이 적용되면 변경 피드 프로세서는 균등 분산 알고리즘을 사용하여 임대 컨테이너에 있는 모든 임대를 해당 배포 단위의 실행 중인 모든 인스턴스에 분산하고 컴퓨팅을 병렬화합니다. 언제나 하나의 인스턴스가 하나의 임대를 소유할 수 있으므로 인스턴스 수가 임대 수보다 크면 안 됩니다.
인스턴스 수가 증가할 수도 있고 감소할 수도 있습니다. 변경 피드 프로세서는 로드를 적절하게 재배포하여 동적으로 조정합니다.
뿐만 아니라 변경 피드 프로세서는 컨테이너의 처리량 또는 스토리지가 늘어나면 컨테이너의 스케일링을 동적으로 조정할 수 있습니다. 컨테이너가 커지면 변경 피드 프로세서는 임대를 동적으로 늘리고 새 임대를 기존 인스턴스에 분산하여 이러한 시나리오를 투명하게 처리합니다.
시작 시간
기본적으로 변경 피드 프로세서를 처음 시작하면 변경 피드 프로세서는 임대 컨테이너를 초기화하고 처리 수명 주기를 시작합니다. 변경 피드 프로세서가 처음으로 초기화되기 전에 모니터링되는 컨테이너에서 발생한 변경 내용은 감지되지 않습니다.
이전 날짜와 시간에서 읽기
DateTime
인스턴스를 WithStartTime
빌더 확장에 전달하여 특정 날짜와 시간에 시작되는 변경 내용을 읽도록 변경 피드 프로세서를 초기화할 수 있습니다.
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
특정 날짜와 시간에 대해 변경 피드 프로세서가 초기화되고, 변경 피드 프로세서는 이후에 발생한 변경 내용을 읽기 시작합니다.
시작부터 읽기
데이터 마이그레이션이나 컨테이너의 전체 기록 분석과 같은 다른 시나리오에서는 컨테이너의 수명이 시작되는 부분부터 변경 피드를 읽어야 합니다. 빌더 확장에서 WithStartTime
을 사용할 수 있지만, 다음 예제와 같이 최소 DateTime
값의 UTC 표현을 생성하는 DateTime.MinValue.ToUniversalTime()
을 전달합니다.
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
변경 피드 프로세서가 초기화되고, 변경 피드 프로세서는 컨테이너 수명이 시작되는 시점부터 변경 내용을 읽기 시작합니다.
참고 항목
이러한 사용자 지정 옵션은 변경 피드 프로세서의 시작점을 설정할 때만 작동합니다. 임대 컨테이너가 처음으로 초기화된 후에는 이러한 옵션을 변경해도 아무 효과가 없습니다.
시작점의 사용자 지정은 최신 버전 변경 피드 모드에서만 가능합니다. 모든 버전 및 삭제 모드를 사용하는 경우 프로세서가 시작된 시점부터 읽기를 시작하거나 계정의 지속적인 백업 보존 기간 내에 있는 이전 임대 상태에서 다시 시작해야 합니다.
변경 피드 및 프로비전된 처리량
모니터링되는 컨테이너에 대한 변경 피드 읽기 작업에는 요청 단위가 사용됩니다. 모니터링되는 컨테이너가 제한에 걸리지 않도록 해야 합니다. 제한에 걸리면 프로세서에서 변경 피드 이벤트를 수신할 때 시간이 지연됩니다.
임대 컨테이너에 대한 작업(상태 업데이트 및 유지 관리)은 요청 단위를 사용합니다. 동일한 임대 컨테이너를 사용하는 인스턴스가 많을수록 요청 단위 사용량이 많아질 수 있습니다. 임대 컨테이너가 제한에 걸리지 않도록 해야 합니다. 제한에 걸리면 변경 피드 이벤트를 수신할 때 시간이 지연됩니다. 심지어 제한 때문에 처리가 완전히 종료될 수도 있습니다.
임대 컨테이너 공유
여러 배포 단위에서 임대 컨테이너를 공유할 수 있습니다. 공유 임대 컨테이너에서 각 배포 단위는 모니터링되는 다른 컨테이너를 수신 대기하거나 다른 processorName
값을 갖습니다. 이 구성에서는 각 배포 단위가 임대 컨테이너에서 독립적인 상태를 유지합니다. 임대 컨테이너의 요청 단위 사용을 검토하여 프로비전된 처리량이 모든 배포 단위에 충분한지 확인하세요.
고급 임대 구성
세 가지 주요 구성은 변경 피드 프로세서의 작동 방식에 영향을 미칠 수 있습니다. 각 구성은 임대 컨테이너의 요청 단위 소비에 영향을 줍니다. 변경 피드 프로세서를 만들 때 다음 구성 중 하나를 설정할 수 있지만, 신중하게 사용해야 합니다.
- 임대 취득: 기본적으로 17초마다 수행됩니다. 호스트는 주기적으로 임대 저장소의 상태를 확인하고 동적 스케일링 프로세스에서 임대 획득을 고려합니다. 이 프로세스는 임대 컨테이너에 대한 쿼리를 실행하여 수행됩니다. 이 값을 줄이면 리밸런싱 및 임대 획득 속도가 빨라지지만, 임대 컨테이너의 요청 단위 소비가 증가합니다.
- 임대 만료: 기본적으로 60초입니다. 갱신 작업 없이 다른 호스트가 임대를 획득할 때까지 임대가 존재할 수 있는 최대 시간을 정의합니다. 호스트가 충돌하면 이 시간 + 구성된 갱신 간격이 경과한 후에 해당 호스트가 소유한 임대를 다른 호스트가 선택합니다. 이 값을 줄이면 호스트 충돌 후 복구 속도가 빨라지지만, 만료 값은 절대로 갱신 간격보다 낮으면 안 됩니다.
- 임대 갱신: 기본적으로 13초마다 수행됩니다. 임대를 소유한 호스트는 사용할 새 변경 내용이 없더라도 주기적으로 임대를 갱신합니다. 이 프로세스는 임대에서 바꾸기를 실행하여 수행됩니다. 이 값을 줄이면 호스트 충돌로 인해 손실된 임대를 감지하는 데 걸리는 시간이 줄어들지만, 임대 컨테이너의 요청 단위 소비가 증가합니다.
변경 피드 프로세서를 호스트할 위치
변경 피드 프로세서는 장기 실행 프로세스 또는 작업을 지원하는 모든 플랫폼에 호스트할 수 있습니다. 다음 몇 가지 예를 참조하세요.
- Azure App Service의 연속 실행 WebJobs 인스턴스
- Azure Virtual Machines 인스턴스의 프로세스
- Azure Kubernetes Service의 백그라운드 작업.
- Azure Functions의 서버리스 함수
- ASP.NET 호스티드 서비스.
임대 컨테이너가 상태를 유지하므로 변경 피드 프로세서를 수명이 짧은 환경에서 실행할 수 있지만, 환경이 시작될 때마다 프로세서를 시작하는 오버헤드로 인해 이러한 환경의 시작 주기는 알림 수신 시간을 지연시킵니다.
역할 기반 액세스 요구 사항
Microsoft Entra ID를 인증 메커니즘으로 사용하는 경우 ID에 적절한 권한이 있는지 확인합니다.
- 모니터링되는 컨테이너의 경우:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- 임대 컨테이너의 경우:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
추가 리소스
다음 단계
다음 문서에서 변경 피드 프로세서에 대해 자세히 알아보세요.