Обзор веб-канала изменений в Azure Cosmos DB
Веб-канал изменений в Azure Cosmos DB собирает и сохраняет все изменения в контейнере порядке их возникновения. Канал изменений, поддерживаемый в Azure Cosmos DB, прослушивает изменения в контейнере Azure Cosmos DB. Затем он выводит отсортированный список документов в порядке, в котором они были изменены. Сохраненные изменения обрабатываются асинхронно и пошагово, а выходные данные могут распределяться в один или несколько объектов-получателей для параллельной обработки.
Канал изменений и разные операции
На сегодняшний момент вы видите все вставки и обновления в канале изменений. Возможность фильтрации канала изменений для просмотра только операций определенного типа не предусмотрена. В настоящее время канал изменений не регистрирует операции удаления. В качестве обходного решения в удаляемые элементы можно добавить программный маркер. Например, можно добавить атрибут в элемент с именем "удалено", задать значение true, а затем задать значение времени в реальном времени (TTL) для элемента. Настройка TTL гарантирует автоматическое удаление элемента.
Чтение канала изменений Azure Cosmos DB
Вы можете работать с веб-каналом изменений Azure Cosmos DB, используя модель принудительной отправки или модель извлечения. Если используется модель отправки, обработчик канала изменений отправляет операции клиенту, который имеет бизнес-логику для их обработки. Но сложность проверки операций и сохранения состояния для последних обработанных операций обрабатывается на обработчике канала изменений.
Если используется модель извлечения, клиент должен извлекать операции с сервера. В этом случае клиент имеет бизнес-логику для обработки работы, а также сохраняет состояние для последней обработанной работы. Клиент обрабатывает балансировку нагрузки между несколькими клиентами, которые обрабатываются параллельно, и обрабатывает ошибки.
Примечание.
Рекомендуется использовать модель push-уведомлений, так как вам не нужно беспокоиться о опросе канала изменений для будущих изменений, хранении состояния для последнего обработанного изменения и других преимуществ.
Большинство сценариев, использующих канал изменений Azure Cosmos DB, используют один из вариантов принудительной модели. Однако существуют некоторые сценарии, в которых может потребоваться дополнительный низкий уровень управления моделью извлечения. Дополнительный низкоуровневый элемент управления включает:
- считывание изменений по определенному ключу секции;
- необходимость в управлении скоростью, с которой клиент получает изменения для обработки;
- однократный доступ к существующим данным в веб-канале изменений (например, для переноса данных).
Чтение канала изменений с помощью модели принудительной отправки
Существует два способа чтения из канала изменений с помощью push-модели: Функции Azure триггеров Azure Cosmos DB и библиотеки обработчика канала изменений. Функции Azure использует обработчик канала изменений за кулисами, поэтому они похожи на чтение канала изменений. Представьте себе Функции Azure в качестве платформы размещения для обработчика веб-канала изменений, а не совершенно иной способ чтения веб-канала изменений. Функции Azure использует обработчик канала изменений за кулисами. Она автоматически параллелизирует обработку изменений в секциях контейнера.
Функции Azure
Вы можете создать небольшие реактивные Функции Azure, которые автоматически активируются для каждого нового события в канале изменений контейнера Azure Cosmos DB. С помощью триггера Функции Azure для Azure Cosmos DB можно использовать функции масштабирования и надежного обнаружения событий обработчика канала изменений без необходимости поддерживать любую рабочую инфраструктуру.
Обработчик канала изменений
Обработчик канала изменений входит в SDK Azure Cosmos DB .NET версии 3 и Java версии 4. Он упрощает процесс чтения канала изменений и эффективно распределяет обработку событий между несколькими потребителями.
В реализации обработчика канала изменений существует четыре основных компонента:
Отслеживаемый контейнер — это контейнер с данными, из которых формируется канал изменений. Все операции вставки и обновлений в отслеживаемом контейнере отражаются в канале изменений контейнера.
Контейнер аренд. Контейнер аренды выступает в качестве хранилища состояний и координирует обработку канала изменений для нескольких рабочих ролей. Контейнер аренды может храниться в той же учетной записи, что и отслеживаемый контейнер, или в отдельной учетной записи.
Вычислительный экземпляр. В вычислительном экземпляре размещается обработчик канала изменений для прослушивания изменений. В зависимости от платформы он может представлять виртуальную машину, модуль pod kubernetes, экземпляр службы приложение Azure, фактический физический компьютер. В нем есть уникальный идентификатор, который в статье называется именем экземпляра.
Делегат — это код, в котором разработчик описывает требуемые действия для каждого пакета изменений, считанного обработчиком канала изменений.
При реализации обработчика канала изменений точка входа всегда является отслеживаемой контейнерой из вызываемого GetChangeFeedProcessorBuilder
экземпляраContainer
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Где первый параметр — это уникальное имя, описывающее цель этого процессора, а второе — реализация делегата, обрабатывающая изменения. Ниже приведен пример делегата:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
После этого вы определите имя вычислительного экземпляра или уникальный идентификатор WithInstanceName
с идентификатором. Это должно быть уникальным и разным в каждом развернутом вычислительном экземпляре, и, наконец, контейнером для поддержания состояния WithLeaseContainer
аренды.
Вызов Build
предоставляет экземпляр процессора, который можно запустить с помощью вызова StartAsync
.
Обычный жизненный цикл экземпляра узла:
- Чтение канала изменений.
- Если изменений нет, спящий режим для предопределенного времени (настраиваемого с
WithPollInterval
помощью )Builder
и перейдите в #1. - При наличии изменений отправьте их в делегат.
- Когда делегат успешно завершит обработку изменений, добавьте в хранилище аренды последнюю обработанную точку во времени и перейдите к № 1.