Обзор веб-канала изменений в Azure Cosmos DB

Завершено

Веб-канал изменений в Azure Cosmos DB собирает и сохраняет все изменения в контейнере порядке их возникновения. Канал изменений, поддерживаемый в Azure Cosmos DB, прослушивает изменения в контейнере Azure Cosmos DB. Затем он выводит отсортированный список документов в порядке, в котором они были изменены. Сохраненные изменения обрабатываются асинхронно и пошагово, а выходные данные могут распределяться в один или несколько объектов-получателей для параллельной обработки.

Канал изменений и разные операции

На сегодняшний момент вы видите все вставки и обновления в канале изменений. Возможность фильтрации канала изменений для просмотра только операций определенного типа не предусмотрена. В настоящее время канал изменений не регистрирует операции удаления. В качестве обходного решения в удаляемые элементы можно добавить программный маркер. Например, можно добавить атрибут в элемент с именем "удалено", задать значение true, а затем задать значение времени в реальном времени (TTL) для элемента. Настройка TTL гарантирует автоматическое удаление элемента.

Чтение канала изменений Azure Cosmos DB

Вы можете работать с веб-каналом изменений Azure Cosmos DB, используя модель принудительной отправки или модель извлечения. Если используется модель отправки, обработчик канала изменений отправляет операции клиенту, который имеет бизнес-логику для их обработки. Но сложность проверки операций и сохранения состояния для последних обработанных операций обрабатывается на обработчике канала изменений.

Если используется модель извлечения, клиент должен извлекать операции с сервера. В этом случае клиент имеет бизнес-логику для обработки работы, а также сохраняет состояние для последней обработанной работы. Клиент обрабатывает балансировку нагрузки между несколькими клиентами, которые обрабатываются параллельно, и обрабатывает ошибки.

Примечание.

Рекомендуется использовать модель push-уведомлений, так как вам не нужно беспокоиться о опросе канала изменений для будущих изменений, хранении состояния для последнего обработанного изменения и других преимуществ.

Большинство сценариев, использующих канал изменений Azure Cosmos DB, используют один из вариантов принудительной модели. Однако существуют некоторые сценарии, в которых может потребоваться дополнительный низкий уровень управления моделью извлечения. Дополнительный низкоуровневый элемент управления включает:

  • считывание изменений по определенному ключу секции;
  • необходимость в управлении скоростью, с которой клиент получает изменения для обработки;
  • однократный доступ к существующим данным в веб-канале изменений (например, для переноса данных).

Чтение канала изменений с помощью модели принудительной отправки

Существует два способа чтения из канала изменений с помощью push-модели: Функции Azure триггеров Azure Cosmos DB и библиотеки обработчика канала изменений. Функции Azure использует обработчик канала изменений за кулисами, поэтому они похожи на чтение канала изменений. Представьте себе Функции Azure в качестве платформы размещения для обработчика веб-канала изменений, а не совершенно иной способ чтения веб-канала изменений. Функции Azure использует обработчик канала изменений за кулисами. Она автоматически параллелизирует обработку изменений в секциях контейнера.

Функции Azure

Вы можете создать небольшие реактивные Функции Azure, которые автоматически активируются для каждого нового события в канале изменений контейнера Azure Cosmos DB. С помощью триггера Функции Azure для Azure Cosmos DB можно использовать функции масштабирования и надежного обнаружения событий обработчика канала изменений без необходимости поддерживать любую рабочую инфраструктуру.

Схема, показывающая канал изменений, запускающий Функции Azure для обработки.

Обработчик канала изменений

Обработчик канала изменений входит в SDK Azure Cosmos DB .NET версии 3 и Java версии 4. Он упрощает процесс чтения канала изменений и эффективно распределяет обработку событий между несколькими потребителями.

В реализации обработчика канала изменений существует четыре основных компонента:

  1. Отслеживаемый контейнер — это контейнер с данными, из которых формируется канал изменений. Все операции вставки и обновлений в отслеживаемом контейнере отражаются в канале изменений контейнера.

  2. Контейнер аренд. Контейнер аренды выступает в качестве хранилища состояний и координирует обработку канала изменений для нескольких рабочих ролей. Контейнер аренды может храниться в той же учетной записи, что и отслеживаемый контейнер, или в отдельной учетной записи.

  3. Вычислительный экземпляр. В вычислительном экземпляре размещается обработчик канала изменений для прослушивания изменений. В зависимости от платформы он может представлять виртуальную машину, модуль pod kubernetes, экземпляр службы приложение Azure, фактический физический компьютер. В нем есть уникальный идентификатор, который в статье называется именем экземпляра.

  4. Делегат — это код, в котором разработчик описывает требуемые действия для каждого пакета изменений, считанного обработчиком канала изменений.

При реализации обработчика канала изменений точка входа всегда является отслеживаемой контейнерой из вызываемого GetChangeFeedProcessorBuilderэкземпляраContainer:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Где первый параметр — это уникальное имя, описывающее цель этого процессора, а второе — реализация делегата, обрабатывающая изменения. Ниже приведен пример делегата:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

После этого вы определите имя вычислительного экземпляра или уникальный идентификатор WithInstanceNameс идентификатором. Это должно быть уникальным и разным в каждом развернутом вычислительном экземпляре, и, наконец, контейнером для поддержания состояния WithLeaseContainerаренды.

Вызов Build предоставляет экземпляр процессора, который можно запустить с помощью вызова StartAsync.

Обычный жизненный цикл экземпляра узла:

  1. Чтение канала изменений.
  2. Если изменений нет, спящий режим для предопределенного времени (настраиваемого с WithPollInterval помощью ) Builderи перейдите в #1.
  3. При наличии изменений отправьте их в делегат.
  4. Когда делегат успешно завершит обработку изменений, добавьте в хранилище аренды последнюю обработанную точку во времени и перейдите к № 1.