Condividi tramite


Modello di pull del feed di modifiche in Azure Cosmos DB

SI APPLICA A: NoSQL

Il modello di pull del feed di modifiche consente di utilizzare il feed di modifiche di Azure Cosmos DB in modo personalizzato. Analogamente al processore di feed di modifiche, è possibile usare il modello di pull del feed di modifiche per parallelizzare l'elaborazione delle modifiche tra più consumer del feed di modifiche.

Confronto con il processore del feed di modifiche

In molti scenari è possibile elaborare il feed di modifiche usando sia il processore del feed di modifiche o il modello di pull del feed di modifiche. I token di continuazione del modello di pull e il contenitore lease del processore del feed di modifiche funzionano entrambi come "segnalibri" dell'ultimo elemento o batch di elementi elaborati nel feed di modifiche.

Tuttavia, non è possibile convertire i token di continuazione in un contenitore lease o viceversa.

Nota

Nella maggior parte dei casi, quando è necessario leggere dal feed di modifiche, l'opzione più semplice consiste nell'usare il processore del feed di modifiche.

È opportuno usare il modello di pull negli scenari seguenti:

  • Per leggere le modifiche per una chiave di partizione specifica.
  • Per controllare la velocità di ricezione delle modifiche per l'elaborazione da parte del client.
  • Per eseguire una sola lettura dei dati esistenti nel feed di modifiche (ad esempio, per eseguire una migrazione di dati).

Di seguito sono riportate alcune differenze fondamentali tra il modello di pull del feed di modifiche e il processore del feed di modifiche.

Funzionalità Processore dei feed di modifiche Modello di pull del feed di modifiche
Tenere traccia del punto corrente nell'elaborazione del feed di modifiche Lease (archiviato in un contenitore Azure Cosmos DB) Token di continuazione (archiviato in memoria o reso persistente manualmente)
Possibilità di riprodurre modifiche precedenti Sì, con modello di push Sì, con modello di pull
Polling per le modifiche future Verificare automaticamente la presenza di modifiche in base al parametro WithPollInterval specificato dall'utente Manuale
Comportamento in cui non sono presenti nuove modifiche Attendere automaticamente il valore per WithPollInterval quindi ricontrollare È necessario controllare lo stato e ricontrollare manualmente
Elaborare le modifiche da un intero contenitore Sì, è parallelizzato automaticamente tra più thread e computer che utilizzano lo stesso contenitore Sì, è parallelizzato manualmente tramite FeedRange
Elaborare le modifiche da una sola chiave di partizione Non supportato

Nota

Quando si usa il modello di pull, a differenza della lettura tramite il processore del feed di modifiche, è necessario gestire in modo esplicito i casi in cui non sono presenti nuove modifiche.

Usare il modello di pull

Per elaborare il feed di modifiche usando il modello di pull, creare un'istanza di FeedIterator. Quando si crea inizialmente FeedIterator, è necessario specificare un valore obbligatorio ChangeFeedStartFrom, costituito sia dalla posizione iniziale per la lettura delle modifiche che dal valore da utilizzare per FeedRange. L'oggetto FeedRange è un intervallo di valori di chiave di partizione e specifica gli elementi che possono essere letti dal feed di modifiche usando tale specifico FeedIterator . È inoltre necessario specificare un valore obbligatorio ChangeFeedMode per la modalità in cui si desidera elaborare le modifiche: versione più recente o tutte le versioni ed eliminazioni. Usare ChangeFeedMode.LatestVersion o ChangeFeedMode.AllVersionsAndDeletes per indicare la modalità che si desidera usare per leggere il feed di modifiche. Quando si usano tutte le versioni ed elimina la modalità, è necessario selezionare un avvio del feed di modifiche dal valore di Now() o da un token di continuazione specifico.

È possibile specificare ChangeFeedRequestOptions per impostare un oggetto PageSizeHint. Se impostata, questa proprietà imposta il numero massimo di elementi ricevuti per pagina. Se le operazioni nella raccolta monitorata vengono eseguite tramite stored procedure, l'ambito della transazione viene mantenuto durante la lettura degli elementi dal feed di modifiche. Di conseguenza, il numero di elementi ricevuti potrebbe essere superiore al valore specificato in modo che gli elementi modificati dalla stessa transazione vengano restituiti come parte di un batch atomico.

Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce oggetti di entità, in questo caso un oggetto User:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Suggerimento

Prima della versione 3.34.0, è possibile usare la modalità versione più recente impostando ChangeFeedMode.Incremental. Sia Incremental che LatestVersion fanno riferimento alla modalità versione più recente del feed di modifiche e le applicazioni che usano entrambe le modalità visualizzeranno lo stesso comportamento.

Tutte le versioni ed elimina la modalità sono in anteprima e possono essere utilizzate con le versioni di .NET SDK di anteprima >= 3.32.0-preview. Ecco un esempio che mostra come ottenere FeedIterator nella modalità Tutte le versioni ed eliminazioni che restituisce oggetti User:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Nota

Nella modalità versione più recente si ricevono oggetti che rappresentano l'elemento modificato, con alcuni metadati aggiuntivi. Tutte le versioni ed elimina la modalità restituiscono un modello di dati diverso. Per ottenere ulteriori informazioni, consultare Analizzare l'oggetto response.

È possibile ottenere l'esempio completo per la modalità Versione più recente o quella Tutte le versioni e le eliminazioni.

Usare il feed di modifiche tramite flussi

FeedIterator per entrambe le modalità del feed di modifiche sono disponibili due opzioni. Oltre agli esempi che restituiscono oggetti entità, è anche possibile ottenere la risposta con il supporto Stream. Stream consente di leggere i dati senza prima deserializzarli e quindi di risparmiare risorse del client.

Ecco un esempio che mostra come ottenere un oggetto FeedIterator in modalità versione più recente che restituisce Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Usare le modifiche per un intero contenitore

Se non si fornisce un parametro FeedRange a FeedIterator, è possibile elaborare il feed di modifiche di un intero contenitore in modo personalizzato. L'esempio riportato di seguito inizia a leggere tutte le modifiche a partire dall'ora corrente, usando la modalità versione più recente:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Poiché il feed di modifiche è effettivamente un elenco infinito di elementi che includono tutte le scritture e gli aggiornamenti futuri, il valore di HasMoreResults è sempre true. Quando si tenta di leggere il feed di modifiche e non sono disponibili nuove modifiche, si riceve una risposta con stato NotModified. Nell'esempio precedente si attendono cinque secondi prima di controllare nuovamente se sono presenti modifiche.

Usare le modifiche per una chiave di partizione

In alcuni casi, è possibile elaborare solo le modifiche per una chiave di partizione specifica. È possibile ottenere un oggetto FeedIterator per una chiave di partizione specifica ed elaborare le modifiche come si farebbe per un intero contenitore.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Usare FeedRange per la parallelizzazione

Nel processore del feed di modifiche il lavoro viene distribuito automaticamente tra più consumer. Nel modello di pull del feed di modifiche è possibile usare FeedRange per impostare l'elaborazione parallela del feed di modifiche. Un elemento FeedRange rappresenta un intervallo di valori di chiave di partizione.

Ecco un esempio che illustra come ottenere un elenco di intervalli per il contenitore:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Quando si ottiene un elenco di valori FeedRange per il contenitore, si otterrà una FeedRange per partizione fisica.

Con FeedRange è quindi possibile creare un oggetto FeedIterator per parallelizzare il feed di modifiche tra più computer o thread. A differenza dell'esempio precedente in cui è stato illustrato come ottenere un oggetto FeedIterator per l'intero contenitore o una singola chiave di partizione, è possibile usare FeedRanges per ottenere più oggetti FeedIterator, che possono elaborare il feed di modifiche in parallelo.

Nel caso in cui si intenda usare oggetti FeedRange, è necessario disporre di un processo dell'agente di orchestrazione che ottenga oggetti FeedRange e li distribuisca a tali computer. Questa distribuzione potrebbe essere:

  • Uso di FeedRange.ToJsonString e distribuzione di questo valore stringa. I consumer possono usare questo valore con FeedRange.FromJsonString.
  • Se la distribuzione è in-process, passaggio del riferimento all'oggetto FeedRange.

Ecco un esempio che illustra come leggere dall'inizio del feed di modifiche del contenitore usando due ipotetici computer distinti che eseguono la lettura in parallelo:

Computer 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computer 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Salvare i token di continuazione

È possibile salvare la posizione dell'utente FeedIterator ottenendo un token di continuazione. Un token di continuazione è un valore stringa che tiene traccia delle ultime modifiche elaborate di FeedIterator e che consente a FeedIterator di riprendere dallo stesso punto in un secondo momento. Il token di continuazione, se specificato, avrà la priorità sull'ora di inizio e su inizia dai valori iniziali. Il codice seguente leggerà tramite il feed di modifiche dopo la creazione del contenitore. Quando non sono più disponibili modifiche, verrà salvato in modo permanente un token di continuazione che consentirà di riprendere il consumo del feed di modifiche in un secondo momento.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Quando si usa la modalità versione più recente, il token di continuazione FeedIterator non scade finché è presente il contenitore Azure Cosmos DB. Quando si usa tutte le versioni ed elimina la modalità, il token di continuazione FeedIterator è valido purché le modifiche siano state apportate all'interno della finestra di conservazione per i backup continui.

Passaggi successivi