Compartir a través de


Uso del calculador de la fuente de cambios

SE APLICA A: NoSQL

En este artículo se describe cómo puede supervisar el progreso de las instancias del procesador de la fuente de cambios a medida que leen la fuente de cambios.

¿Por qué es importante el progreso de la supervisión?

El procesador de la fuente de cambios actúa como un puntero que avanza por la fuente de cambios y entrega los cambios a una implementación de delegado.

La implementación del procesador de la fuente de cambios puede procesar los cambios a una velocidad determinada en función de los recursos disponibles, como la CPU, la memoria, la red, etc.

Si esta velocidad es más lenta que la velocidad a la que se producen los cambios en el contenedor de Azure Cosmos DB, el procesador comienza a retardar.

La identificación de este escenario ayuda a comprender si es necesario escalar la implementación del procesador de la fuente de cambios.

Implementación del calculador de la fuente de cambios

Como modelo de inserción para las notificaciones automáticas

Al igual que el procesador de la fuente de cambios, el estimador de la fuente de cambios funciona como un modelo de inserción. El estimador mide la diferencia entre el último elemento procesado (definido por el estado del contenedor de concesiones) y el último cambio en el contenedor e inserta este valor en un delegado. El intervalo en el que se toma la medida también puede personalizarse con un valor predeterminado de 5 segundos.

Por ejemplo, si el procesador de fuente de cambios usa el modo de versión más reciente y se define de esta manera:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

La manera correcta de inicializar un calculador para medir ese procesador sería usar GetChangeFeedEstimatorBuilder como se explica a continuación:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Donde tanto el procesador como el calculador comparten el mismo leaseContainer y el mismo nombre.

Los otros dos parámetros son el delegado, que recibe un número que representa cuántos cambios están pendientes de ser leídos por el procesador y el intervalo de tiempo en el que desea que se tome esta medida.

Un ejemplo de un delegado que recibe la estimación es:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Puede enviarla a su solución de supervisión y usarla para saber cómo se comporta el progreso con el tiempo.

Como una estimación detallada a petición

A diferencia del modelo de inserción, hay una alternativa que le permite obtener la estimación a petición. Este modelo también proporciona información más detallada:

  • Retardo estimado por concesión.
  • Instancia que posee y procesa cada concesión, de modo que puede identificar si hay algún problema en una instancia.

Si el procesador de la fuente de cambios se define de la siguiente manera:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Puede crear el estimador con la misma configuración de concesión:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Y siempre que quiera, con la frecuencia que necesite, puede obtener la estimación detallada:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Cada ChangeFeedProcessorState contiene la información de concesión y retraso, y también quién es la instancia actual que lo posee.

Implementación del estimador

No es necesario implementar el estimador de fuente de cambios como parte del procesador de fuente de cambios ni formar parte del mismo proyecto. Se recomienda implementar el estimador en una instancia independiente de los procesadores. Una sola instancia del estimador puede realizar un seguimiento del progreso de todas las concesiones e instancias de la implementación del procesador de fuente de cambios.

Cada estimación consume unidades de solicitud de los contenedores supervisados y de concesión. Una frecuencia de 1 minuto entre sí es un buen punto de partida; cuanto menor sea la frecuencia, mayor será la cantidad de unidades de solicitud consumidas.

Modos de fuente de cambios admitidos

El estimador de fuente de cambios se puede usar para modo de versión más reciente y todas las versiones y elimina el modo. En ambos modos, no se garantiza que la estimación proporcionada sea un recuento exacto de cambios pendientes en el proceso.

Recursos adicionales

Pasos siguientes

Ahora puede continuar para obtener más información sobre el procesador de fuente de cambios en el siguiente artículo: