Partilhar via


Migrar da biblioteca do processador de feed de alterações para o SDK do Azure Cosmos DB .NET V3

APLICA-SE A: NoSQL

Este artigo descreve as etapas necessárias para migrar o código de um aplicativo existente que usa a biblioteca do processador de feed de alterações para o recurso de feed de alterações na versão mais recente do SDK do .NET (também conhecido como SDK do .NET V3).

Alterações de código necessárias

O SDK do .NET V3 tem várias alterações de quebra, a seguir estão as principais etapas para migrar seu aplicativo:

  1. Converta as DocumentCollectionInfo instâncias em Container referências para os contêineres monitorados e de locação.
  2. As personalizações que usam WithProcessorOptions devem ser atualizadas para uso WithLeaseConfiguration e WithPollInterval para intervalos, WithStartTime para a hora de início e WithMaxItems para definir a contagem máxima de itens.
  3. Defina o ativado processorName GetChangeFeedProcessorBuilder para corresponder ao valor configurado em ChangeFeedProcessorOptions.LeasePrefix, ou use string.Empty de outra forma.
  4. As alterações não são mais entregues como um IReadOnlyList<Document>, em vez disso, é um IReadOnlyCollection<T> onde é um tipo que T você precisa definir, não há mais classe de item base.
  5. Para lidar com as alterações, você não precisa mais de uma implementação do IChangeFeedObserver, em vez disso, você precisa definir um delegado. O delegado pode ser uma função estática ou, se você precisar manter o estado nas execuções, pode criar sua própria classe e passar um método de instância como delegado.

Por exemplo, se o código original para criar o processador de alimentação de alterações tiver a seguinte aparência:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

O código migrado terá a seguinte aparência:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Para o delegado, você pode ter um método estático para receber os eventos. Se você estava consumindo informações do IChangeFeedObserverContext você pode migrar para usar o ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken pode ser usado em vez de IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers pode ser usado em vez de IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contém informações detalhadas sobre latência de solicitação para solução de problemas
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Eventos de saúde e observabilidade

Se anteriormente você estava usando IHealthMonitor ou estava aproveitando IChangeFeedObserver.OpenAsync e IChangeFeedObserver.CloseAsync, use a API de notificações.

  • IChangeFeedObserver.OpenAsync pode ser substituído por WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync pode ser substituído por WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync pode ser substituído por WithErrorNotification.

Estado e contentor de leasing

Semelhante à biblioteca do processador de feed de alterações, o recurso de feed de alterações no SDK do .NET V3 usa um contêiner de concessão para armazenar o estado. No entanto, os esquemas são diferentes.

O processador de feed de alterações do SDK V3 detetará qualquer estado antigo da biblioteca e o migrará para o novo esquema automaticamente após a primeira execução do código do aplicativo migrado.

Você pode parar o aplicativo com segurança usando o código antigo, migrar o código para a nova versão, iniciar o aplicativo migrado e quaisquer alterações que aconteceram enquanto o aplicativo foi interrompido serão coletadas e processadas pela nova versão.

Recursos adicionais

Próximos passos

Agora você pode continuar para saber mais sobre o processador de alimentação de alterações nos seguintes artigos: