Partager via


Modèle de tirage (pull) du flux de modification dans Azure Cosmos DB

S’APPLIQUE À : NoSQL

Utilisez le modèle de tirage du flux de modification pour consommer le flux de modification Azure Cosmos DB à votre rythme. Comme avec le processeur de flux de modification, vous pouvez utiliser le modèle de tirage (pull) de flux de modification pour paralléliser le traitement des modifications entre plusieurs consommateurs de flux de modification.

Comparaison avec le processeur de flux de modification

De nombreux scénarios permettent de traiter le flux de modification à l’aide du processeur de flux de modification ou du modèle de tirage du flux de modification. Les jetons de continuation du modèle de tirage et le conteneur de bail du processeur de flux de modification agissent comme des signets pour le dernier élément (ou lot d’éléments) traité dans le flux de modification.

Toutefois, vous ne pouvez pas convertir les jetons de continuation en bail (ou inversement).

Remarque

Dans la plupart des cas, lorsque vous devez lire à partir du flux de modification, l’option la plus simple consiste à utiliser le processeur de flux de modification.

Envisagez plutôt d’utiliser le modèle de tirage dans les scénarios suivants :

  • Pour lire les modifications à partir d’une clé de partition spécifique.
  • Pour contrôler la vitesse à laquelle votre client reçoit les modifications à traiter.
  • Pour effectuer une lecture unique des données existantes dans le flux de modification (par exemple, pour effectuer une migration de données).

Voici certaines des différences capitales entre le processeur de flux de modification et le modèle de tirage du flux de modification :

Fonctionnalité Processeur de flux de modification Changer le modèle d’extraction de flux
Suivi du point actuel dans le traitement du flux de modification Bail (stocké dans un conteneur Azure Cosmos DB) Jeton de continuation (stocké en mémoire ou rendu persistant manuellement)
Possibilité de relire les modifications passées Oui, avec le modèle d’envoi (push) Oui, avec le modèle de tirage (pull)
Interrogation des modifications à venir Vérifie automatiquement les modifications en fonction de la valeur WithPollInterval spécifiée par l’utilisateur Manuel
Comportement dans lequel aucune nouvelle modification n’est apportée Attend automatiquement la valeur de WithPollInterval, puis relance la vérification Vous devez vérifier l’état et revérifier manuellement
Traitement des modifications depuis un conteneur entier Oui, avec parallélisation automatique sur plusieurs threads et machines qui consomment depuis le même conteneur Oui avec parallélisation manuelle à l’aide de FeedRange
Traitement des modifications à partir d’une unique clé de partition Non pris en charge Oui

Remarque

Contrairement à la lecture avec le processeur de flux de modification, vous devez gérer explicitement les cas sans aucune nouvelle modification lorsque vous utilisez le modèle de tirage.

Utilisation du modèle de tirage

Pour traiter le flux de modification avec le modèle de tirage, créez une instance FeedIterator. Lorsque vous créez un FeedIterator, vous devez définir une valeur ChangeFeedStartFrom obligatoire constituée de la position de départ pour la lecture des modifications et de la valeur FeedRange à utiliser. FeedRange offre une plage de valeurs de clé de partition qui spécifie les éléments pouvant être lus à partir du flux de modification en utilisant le FeedIterator en question. Vous devez également spécifier une valeur requise ChangeFeedMode pour le mode dans lequel vous souhaitez traiter les modifications : dernière version ou toutes les versions et suppressions. Utilisez ChangeFeedMode.LatestVersion ou ChangeFeedMode.AllVersionsAndDeletes pour indiquer le mode à utiliser pour lire le flux de modification. Quand vous utilisez le mode « Toutes les versions et suppressions », vous devez sélectionner une valeur de début de flux de modification égale à Now() ou correspondant à un jeton de continuation spécifique.

Vous pouvez, si vous le souhaitez, spécifier ChangeFeedRequestOptions pour définir un PageSizeHint. Quand cette propriété est définie, elle détermine le nombre maximal d’éléments reçus par page. Si des opérations de la collection supervisée sont effectuées par le biais de procédures stockées, l’étendue de transaction est conservée lors de la lecture d’éléments à partir du flux de changement. Ainsi, le nombre d’éléments reçus peut être supérieur à la valeur spécifiée, de sorte à renvoyer les éléments modifiés par une même transaction dans un même lot atomique.

Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie les objets d’entité, en l’occurrence un objet User :

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Conseil

Avant la version 3.34.0, le mode Dernière version peut être utilisé en définissant ChangeFeedMode.Incremental. Incremental et LatestVersion font référence au mode Dernière version du flux de modification, et les applications qui utilisent l’un ou l’autre mode voient le même comportement.

Le mode Toutes les versions et suppressions est en préversion et peut être utilisé avec les préversions >= 3.32.0-preview du kit SDK .NET. Voici un exemple d’obtention d’un FeedIterator dans le mode toutes les versions et suppressions qui retourne des objets User :

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Remarque

En mode « Dernière version », vous recevez les objets qui représentent l’élément modifié avec des métadonnées supplémentaires. Le mode « Toutes les versions et suppressions » renvoie un modèle de données différent. Pour plus d’informations, consultez la rubrique Analyse de l’objet de réponse.

Vous pouvez obtenir l’exemple complet pour le mode dernière version ou le mode toutes les versions et suppressions.

Consommation du flux de modification avec des flux

Avec les deux modes de flux de modification, le FeedIterator offre deux options. En plus des exemples qui retournent des objets d’entité, vous pouvez également obtenir la réponse avec la prise en charge de Stream. Les flux vous permettent de lire des données sans les désérialiser au préalable, et donc de les enregistrer sur les ressources clientes.

Voici un exemple qui décrit comment obtenir un FeedIterator en mode « Dernière version » qui renvoie un Stream :

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Consommation des modifications d’un conteneur entier

Si vous ne définissez pas de paramètre FeedRange pour un FeedIterator, vous pouvez traiter le flux de modification d’un conteneur entier à votre rythme. Voici un exemple qui permet de lire toutes les modifications à partir de l’heure actuelle en utilisant le mode« Dernière version » :

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Comme le flux de modification constitue effectivement une liste d’éléments infinie qui englobe toutes les écritures et mises à jour ultérieures, la valeur de HasMoreResults est toujours true. Lorsque vous essayez de lire le flux de modification et qu’aucune nouvelle modification n’est disponible, vous recevez une réponse avec l’état NotModified. Dans l’exemple précédent, il est géré en attendant cinq secondes avant de revérifier les modifications.

Consommation des modifications d’une clé de partition

Dans certains cas, vous pouvez traiter uniquement les modifications d’une clé de partition donnée. Vous pouvez obtenir un FeedIterator pour une clé de partition donnée et traiter les modifications comme vous le feriez pour un conteneur entier.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Utilisation de FeedRange pour la parallélisation

Dans le processeur de flux de modification, le travail est automatiquement réparti sur plusieurs consommateurs. Dans le modèle de tirage du flux de modification, vous pouvez utiliser FeedRange pour paralléliser le traitement du flux de modification. FeedRange représente une plage de valeurs de clé de partition.

Voici un exemple qui décrit comment obtenir une liste de plages pour votre conteneur :

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Lorsque vous obtenez la liste des valeurs FeedRange de votre conteneur, vous obtenez un FeedRange par partition physique.

Vous pouvez utiliser un FeedRange pour créer un FeedIterator afin de paralléliser le traitement du flux de modification sur plusieurs machines ou threads. Contrairement à l’exemple précédent qui illustrait comment obtenir un FeedIterator pour l’ensemble du conteneur ou pour une seule clé de partition, vous pouvez utiliser FeedRanges pour obtenir plusieurs FeedIterators qui peuvent traiter le flux de modification en parallèle.

Si vous souhaitez utiliser FeedRanges, vous devez disposer d’un processus d’orchestrateur qui obtient FeedRanges et les distribue à ces machines. Cette distribution peut être :

  • L’utilisation de FeedRange.ToJsonString et la distribution de cette valeur de chaîne. Les consommateurs peuvent utiliser cette valeur avec FeedRange.FromJsonString.
  • Le passage de la référence d’objet FeedRange si la distribution est en cours.

Voici un exemple qui décrit comment utiliser deux machines hypothétiques distinctes qui lisent en parallèle pour lire à partir du début du flux de modification du conteneur :

Machine 1 :

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Machine 2 :

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Enregistrement des jetons de continuation

Vous pouvez enregistrer la position de votre FeedIterator en obtenant le jeton de continuation. Un jeton de continuation est une valeur de chaîne qui assure le suivi des dernières modifications traitées de votre FeedIterator et autorise FeedIterator à reprendre plus tard. Le jeton de continuation, s’il est spécifié, est prioritaire sur l’heure de début et commence à partir des valeurs de début. Le code suivant lit le flux de modification depuis la création du conteneur. Lorsque plus aucune modification n’est disponible, un jeton de continuation est conservé pour que la consommation du flux de modification puisse être reprise plus tard.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

En mode« Dernière version », le jeton de continuation FeedIterator n’expire jamais tant que le conteneur Azure Cosmos DB existe. En mode « Toutes les versions et suppressions », le jeton de continuation FeedIterator est valide tant que les modifications surviennent durant la période de rétention des sauvegardes continues.

Étapes suivantes