共用方式為


Azure Cosmos DB 中的變更摘要處理器

適用於:NoSQL

變更摘要處理器是 Azure Cosmos DB .NET V3Java V4 SDK 的一部分。 這可以簡化讀取變更摘要的程序,並有效將事件處理散發給多個取用者。

使用變更摘要處理器的主要優點是其容錯設計,可確保「至少一次」傳遞變更摘要中的所有事件。

支援的 SDK

.Net V3 Java Node.JS Python

變更摘要處理器的元件

變更摘要處理器有四個主要元件:

  • 受監視的容器:受監視的容器含有會產生變更摘要的資料。 對於受監視的容器所進行的任何插入和更新,都會反映在容器的變更摘要中。

  • 租用容器:租用容器作為狀態儲存體,可協調如何處理多個背景工作角色的變更摘要。 租用容器可以儲存在與受監視容器相同的帳戶中,或儲存在個別帳戶中。

  • 計算執行個體:計算執行個體主控變更摘要處理器以接聽變更。 根據平台,這可能是由虛擬機器 (VM)、Kubernetes Pod、Azure App Service 執行個體或實際實體機器來表示。 計算執行個體具有本文中稱為「執行個體名稱」的唯一識別碼。

  • 委派:委派是一種程式碼,定義身為開發人員的您想要對變更摘要處理器讀取的每個批次變更進行哪些動作。

為了進一步瞭解變更摘要處理器的這四個元素如何一起運作,請看下圖中的範例。 受監視的容器會儲存項目,並使用 'City' 作為分割區索引鍵。 分割區索引鍵值會散發至包含項目的範圍 (每個範圍都代表一個實體分割區)。

此圖顯示兩個計算執行個體,而且變更摘要處理器將不同的範圍指派給每個執行個體,以將計算散發最大化。 每個執行個體都有不同的唯一名稱。

每個範圍都會平行進行讀取。 範圍的進度會透過「租用」文件與租用容器中的其他範圍各自維護。 租用的組合代表變更摘要處理器的目前狀態。

變更摘要處理器範例

實作變更摘要處理器

.NET 中的變更摘要處理器適用於最新版本模式所有版本和刪除模式。 所有版本和刪除模式處於預覽狀態,且支援從版本 3.40.0-preview.0 開始的變更摘要處理器。 這兩種模式的進入點一律是受監視的容器。

若要使用最新版本模式進行讀取,請在 Container 執行個體中呼叫 GetChangeFeedProcessorBuilder

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

若要使用所有版本和刪除模式讀取,請從 Container 執行個體呼叫 GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

針對兩種模式,第一個參數是可描述此處理器目標的不同名稱。 第二個名稱是可處理變更的委派實作。

以下是最新版本模式的委派範例:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

以下是所有版本和刪除模式的委派範例:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

之後,您可以使用 WithInstanceName 來定義計算執行個體名稱或唯一識別碼。 針對您要部署的每個計算執行個體,計算執行個體名稱應該唯一且不同。 您可以使用 WithLeaseContainer 來設定容器以維護租用狀態。

呼叫 Build 提供可藉由呼叫 StartAsync 來啟動的處理器執行個體。

注意

先前的程式碼片段取自 GitHub 中的範例。 您可以使用最新版本模式所有版本和刪除模式的範例。

處理生命週期

主機執行個體的正常生命週期為:

  1. 讀取變更摘要。
  2. 如果沒有任何變更,則睡眠一段預先定義的時間 (可使用建立器中的 WithPollInterval 自訂),然後移至 #1。
  3. 如果有變更,則將其傳送至委派
  4. 當委派成功完成處理變更時,請以最新的處理時間點更新租用存放區,並移至 #1。

錯誤處理

變更摘要處理器具有使用者程式碼錯誤的復原性。 如果您的委派實作有未處理的例外狀況 (步驟 #4),則會停止處理該特定批次變更的執行緒,最後建立新的執行緒。 新的執行緒會檢查租用存放區已針對該範圍的分割區索引鍵值所儲存的最新時間點。 新的執行緒會從該處重新啟動,以有效地將相同批次的變更傳送給委派。 除非委派正確地處理變更,否則此行為會繼續,這也是變更摘要處理器保證「至少一次」的原因。

注意

整批變更只有在一種情況下才不會淘汰。 如果第一次執行委派時失敗,則租用存放區沒有先前儲存的狀態可用於重試。 在這些情況下,重試會使用初始啟動設定,這可能或可能不會包括最後一個批次。

為避免變更摘要處理器「停滯」在持續重試相同的變更批次,建議在您的委派程式碼中新增邏輯,以在例外狀況時,將文件寫入錯誤訊息佇列。 這項設計可確保您可以追蹤未處理的變更,同時仍能繼續處理未來的變更。 錯誤訊息佇列可能是另一個 Azure Cosmos DB 容器。 確切的資料存放區並不重要。 您只想要持續保存未處理的變更。

您也可以使用變更摘要估算器來監視變更摘要處理器執行個體讀取變更摘要的進度,也可以使用生命週期通知來偵測基礎失敗。

生命週期通知

您可以將變更摘要處理器連線至其生命週期中的任何相關事件。 您可以選擇收到其中一項或所有項目的通知。 建議至少註冊錯誤通知:

  • 註冊 WithLeaseAcquireNotification 的處理常式,以便在目前的主機取得租用來開始處理時收到通知。
  • 註冊 WithLeaseReleaseNotification 的處理常式,以便在目前的主機釋放租用並停止處理時收到通知。
  • 註冊 WithErrorNotification 的處理常式,以在目前的主機於處理期間發生例外狀況時收到通知。 您需要能夠辨別來源是否為使用者委派 (未處理的例外狀況),或處理器在嘗試存取受監視容器時遇到的錯誤 (例如網路問題)。

這兩種變更摘要模式都可以使用生命週期通知。 以下是最新版本模式的生命週期通知範例:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

部署單位

單一變更摘要處理器部署單位是由一或多個具有相同 processorName 值和租用容器設定的計算執行個體所組成,但執行個體名稱都不同。 您可以有許多部署單位,其中每個單位都有不同的商務流程來進行變更,而每個部署單位都包含一或多個執行個體。

例如,您可能會有一個部署單位,每次您的容器有變更時,就會觸發外部 API。 每次有變更時,另一個部署單位可能會即時移動資料。 受監視容器中發生變更時,您所有的部署單位都會收到通知。

動態調整

如稍早所述,您可以在部署單位內有一或多個計算執行個體。 若要利用部署單位內的計算散發,唯一的主要需求是:

  • 所有執行個體都應該具有相同的租用容器設定。
  • 所有執行個體都應該具有相同的 processorName 值。
  • 每個執行個體都必須有不同的執行個體名稱 (WithInstanceName)。

如果這三個條件都適用,則變更摘要處理器會使用相等的散發演算法,將租用容器中的所有租用都散發到該部署單位的所有執行中執行個體,並平行處理計算。 在任何時間,一個執行個體會擁有一個租用,因此執行個體數目不應該大於租用數目。

執行個體數目可以擴大和縮小。 變更摘要處理器會據此轉散發來動態調整負載。

此外,如果容器的輸送量或儲存體增加,則變更摘要處理器可以動態調整容器規模。 當您的容器成長時,變更摘要處理器會動態增加租用,並在現有的執行個體之間散發新的租用,以透明的方式處理該情節。

開始時間

變更摘要處理器第一次啟動時,預設會初始化租用容器,並開始其處理生命週期。 偵測不到變更摘要處理器第一次初始化之前受監視容器中發生的任何變更。

從上一個日期和時間讀取

DateTime 的執行個體傳遞至 WithStartTime 建立器延伸模組,即可初始化從「特定日期和時間」開始讀取變更的變更摘要處理器:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

變更摘要處理器會針對該特定日期和時間進行初始化,並開始讀取其後發生的變更。

從頭開始讀取

在其他情節中 (例如在資料移轉中,或分析容器的整個歷程記錄時),您需要從「該容器存留期的開頭」讀取變更摘要。 您可以在建立器延伸模組上使用 WithStartTime,但傳遞 DateTime.MinValue.ToUniversalTime(),這會產生最小 DateTime 值的 UTC 表示法,如此範例所示:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

變更摘要處理器會進行初始化,並從容器存留期的開頭開始讀取變更。

注意

這些自訂選項只能用來設定變更摘要處理器的開始時間點。 第一次初始化租用容器之後,變更這些選項不會有任何作用。

自訂起點僅適用於最新版本變更摘要模式。 使用所有版本和刪除模式時,您必須從處理器啟動時開始讀取,或從帳戶的連續備份保留期間內的先前租用狀態繼續執行。

變更摘要和佈建的輸送量

受監視容器上的變更摘要讀取作業會使用要求單位。 請確定受監視容器未遇到節流。 節流會增加在處理器上接收變更摘要事件的延遲。

租用容器上的作業 (更新和維護狀態) 會取用要求單位。 使用相同租用容器的執行個體數目越多,要求單位的潛在使用量就越高。 請確定租用容器未遇到節流。 節流會增加接收變更摘要事件的延遲。 節流甚至可以完全結束處理。

共用租用容器

您可以跨多個部署單位來共用租用容器。 在共用租用容器中,每個部署單位都會接聽不同的受監視容器,或針對 processorName 有不同的值。 在此設定中,每個部署單位都會維護租用容器上的獨立狀態。 檢閱租用容器上的要求單位使用量,以確定佈建的輸送量足以容納所有部署單位。

進階租用設定

三個主要設定可能會影響變更摘要處理器的運作方式。 每個設定都會影響租用容器上的要求單位使用量。 您可以在建立變更摘要處理器時設定下列其中一個設定,但請小心使用:

  • 租用取得:預設為每 17 秒。 主機會定期檢查租用存放區的狀態,並考慮在動態調整程序期間取得租用。 此程序是透過在租用容器上執行查詢來完成。 減少此值可加快重新平衡和取得租用的速度,但會增加租用容器上的要求單位使用量
  • 租用到期:預設為 60 秒。 定義租用在由另一部主機取得之前可以存在且沒有任何續約活動的時間量上限。 主機當機時,其他主機會在這段期間加上所設定的續約間隔之後挑選其所擁有的租用。 減少此值會在主機當機後更快速復原,但到期值絕對不應該低於續約間隔。
  • 租用續約:預設為每 13 秒。 擁有租用的主機會定期將租用續約,即使沒有可使用的新變更也是一樣。 此程序是透過在租用上執行「取代」來完成。 減少此值會降低偵測主機當機所遺失租用所需的時間,但會增加租用容器上的要求單位使用量

變更摘要處理器的託管位置

在支援長時間執行程序或工作的任何平台中,可以託管變更摘要處理器。 以下列出一些範例:

雖然變更摘要處理器可以在短期環境中執行,但因為租用容器會維護狀態,所以這些環境的啟動週期會增加收到通知所需的時間延遲 (因為每次啟動環境時,啟動處理器需要額外時間)。

角色型存取要求

使用 Microsoft Entra ID 作為驗證機制時,請確保身分具有適當的權限

  • 在受監視的容器上:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • 在租用容器上:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

其他資源

下一步

深入了解下列文章中的變更摘要處理器: