Eksplorowanie zestawienia zmian w usłudze Azure Cosmos DB
Zestawienie zmian w usłudze Azure Cosmos DB to trwały rekord zmian w kontenerze w kolejności ich wystąpienia. Obsługa zestawienia zmian w usłudze Azure Cosmos DB działa przez nasłuchiwanie zmian w kontenerze usługi Azure Cosmos DB. Następnie tworzone są dane wyjściowe w postaci posortowanej listy zmienionych dokumentów w kolejności, w której zostały zmodyfikowane. Utrwalone zmiany mogą być przetwarzane asynchronicznie i przyrostowo, a dane wyjściowe mogą być dystrybuowane między co najmniej jednym odbiorcą przetwarzania równoległego.
Zestawienie zmian i różne operacje
Obecnie wszystkie wstawki i aktualizacje są widoczne w kanale informacyjnym zmian. Nie można filtrować zestawienia zmian dla określonego typu operacji. Obecnie kanał informacyjny zmian nie rejestruje operacji usuwania. Aby obejść ten problem, można dodać znacznik nietrwały do elementów, które są usuwane. Na przykład możesz dodać atrybut w elemencie o nazwie "deleted", ustawić jego wartość na "true", a następnie ustawić wartość czasu wygaśnięcia (TTL) dla elementu. Ustawienie czasu wygaśnięcia gwarantuje, że element zostanie automatycznie usunięty.
Odczytywanie zestawienia zmian w usłudze Azure Cosmos DB
Możesz pracować z zestawieniem zmian usługi Azure Cosmos DB przy użyciu modelu wypychania lub modelu ściągania. W przypadku modelu wypychania procesor zestawienia zmian wypycha działanie do klienta, który ma logikę biznesową do przetwarzania tej pracy. Jednak złożoność sprawdzania pracy i przechowywania stanu ostatniej przetworzonej pracy jest obsługiwana w procesorze zestawienia zmian.
W przypadku modelu ściągania klient musi ściągnąć pracę z serwera. W takim przypadku klient ma logikę biznesową do przetwarzania pracy, a także przechowuje stan ostatniej przetworzonej pracy. Klient obsługuje równoważenie obciążenia w wielu klientach przetwarzania pracy równolegle i obsługuje błędy.
Uwaga
Zaleca się użycie modelu wypychania, ponieważ nie trzeba martwić się o sondowanie zestawienia zmian pod kątem przyszłych zmian, przechowywanie stanu dla ostatniej przetworzonej zmiany i innych korzyści.
Większość scenariuszy korzystających ze zestawienia zmian usługi Azure Cosmos DB używa jednej z opcji modelu wypychania. Istnieją jednak pewne scenariusze, w których można chcieć mieć dodatkową kontrolę niskiego poziomu nad modelem ściągania. Dodatkowa kontrola niskiego poziomu obejmuje:
- Odczytywanie zmian z określonego klucza partycji
- Kontrolowanie tempa, w którym klient otrzymuje zmiany do przetwarzania
- Jednorazowe odczytywanie istniejących danych w kanale zmian (na przykład w celu przeprowadzenia migracji danych)
Odczytywanie zestawienia zmian za pomocą modelu wypychania
Istnieją dwa sposoby odczytywania ze zestawienia zmian za pomocą modelu wypychania: wyzwalacze usługi Azure Functions w usłudze Azure Cosmos DB i biblioteka procesora zestawienia zmian. Usługa Azure Functions używa procesora zestawienia zmian w tle, więc są to oba podobne sposoby odczytywania zestawienia zmian. Usługa Azure Functions jest po prostu platformą hostingu dla procesora zestawienia zmian, a nie zupełnie innym sposobem odczytywania zestawienia zmian. Usługa Azure Functions używa procesora zestawienia zmian w tle. Automatycznie przetwarza zmiany w partycjach kontenera.
Azure Functions
Możesz utworzyć małe reaktywne funkcje usługi Azure Functions, które są automatycznie wyzwalane na każdym nowym zdarzeniu w kanale zmian kontenera usługi Azure Cosmos DB. Za pomocą wyzwalacza usługi Azure Functions dla usługi Azure Cosmos DB można użyć funkcji skalowania i niezawodnego wykrywania zdarzeń procesora zestawienia zmian bez konieczności obsługi infrastruktury procesu roboczego.
Procesor zestawienia zmian
Procesor zestawienia zmian jest częścią zestawów SDK platformy .NET usługi Azure Cosmos DB w wersji 3 i języka Java w wersji 4 . Upraszcza to proces odczytywania zestawienia zmian i efektywnego dystrybuowania przetwarzania zdarzeń między wieloma użytkownikami.
Są cztery główne składniki implementacji procesora zestawienia zmian:
Monitorowany kontener: monitorowany kontener zawiera dane, z których jest generowany kanał zmian. Wszystkie wstawienia i aktualizacje monitorowanego kontenera są odzwierciedlone w jego zestawieniu zmian.
Kontener dzierżawy: kontener dzierżawy działa jako magazyn stanu i koordynuje przetwarzanie zestawienia zmian między wieloma procesami roboczymi. Kontener dzierżawy może być przechowywany na tym samym koncie co monitorowany kontener lub na osobnym koncie.
Wystąpienie obliczeniowe: wystąpienie obliczeniowe hostuje procesor zestawienia zmian w celu nasłuchiwania zmian. W zależności od platformy może ona być reprezentowana przez maszynę wirtualną, zasobnik kubernetes, wystąpienie usługi aplikacja systemu Azure Service, rzeczywistą maszynę fizyczną. Ma on unikatowy identyfikator, do których odwołuje się nazwa wystąpienia w tym artykule.
Delegat: Delegat to kod, który definiuje, co ty, deweloper, chcesz zrobić z każdą partią zmian odczytanych przez procesor zestawienia zmian.
Podczas implementowania procesora zestawienia zmian punkt wejścia jest zawsze monitorowany kontener, z Container
wystąpienia, które wywołujesz GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Gdzie pierwszy parametr jest odrębną nazwą opisającą cel tego procesora, a drugą nazwą jest implementacja delegata, która obsługuje zmiany. Oto przykład delegata:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Następnie zdefiniujesz nazwę wystąpienia obliczeniowego lub unikatowy identyfikator WithInstanceName
za pomocą polecenia . Powinno to być unikatowe i inne w każdym wdrażanych wystąpieniach obliczeniowych, a na koniec jest to kontener do obsługi stanu dzierżawy za pomocą WithLeaseContainer
polecenia .
Wywołanie Build
daje wystąpienie procesora, które można rozpocząć, wywołując polecenie StartAsync
.
Normalny cykl życiowy wystąpienia hosta wygląda następująco:
- Odczyt zestawienia zmian.
- Jeśli nie ma żadnych zmian, uśpij wstępnie zdefiniowany czas (dostosowywalny za pomocą
WithPollInterval
polecenia w plikuBuilder
) i przejdź do pliku #1. - Jeśli istnieją zmiany, wysłanie ich do obiektu delegowanego.
- Po pomyślnym zakończeniu przetwarzania zmian przez obiekt delegowany następuje aktualizacja magazynu dzierżawy informacjami o ostatnim przetworzonym punkcie w czasie i przejście do punktu 1.