次の方法で共有


Azure Cosmos DB for NoSQL と変更フィード プロセッサを使用する Java アプリケーションを作成する方法

適用対象: NoSQL

Azure Cosmos DB は、Microsoft が提供するフル マネージドの NoSQL データベース サービスです。 これにより、グローバルに分散され、拡張性の高いアプリケーションを簡単に構築できます。 この攻略ガイドでは、Azure Cosmos DB for NoSQL データベースを使用し、リアルタイム データ処理用の変更フィード プロセッサを実装する Java アプリケーションを作成するプロセスについて説明します。 Java アプリケーションは、Azure Cosmos DB Java SDK v4 を使用して、Azure Cosmos DB for NoSQL と通信します。

重要

このチュートリアルは、Azure Cosmos DB Java SDK v4 のみを対象としています。 詳細については、Azure Cosmos DB Java SDK v4 リリース ノートMaven リポジトリAzure Cosmos DB の変更フィード プロセッサ、Azure Cosmos DB Java SDK v4 トラブルシューティング ガイドを参照してください。 v4 より前のバージョンを現在使用している場合、v4 にアップグレードするには、Azure Cosmos DB Java SDK v4 ガイドを参照してください。

前提条件

  • Azure Cosmos DB アカウント: Azure portalから作成することも、Azure Cosmos DB Emulator を使用することもできます。

  • Java 開発環境: 少なくとも 8 つのバージョンの Java Development Kit (JDK) がマシンにインストールされていることを確認します。

  • Azure Cosmos DB Java SDK V4: Azure Cosmos DB と対話するために必要な機能を提供します。

背景

Azure Cosmos DB の変更フィードには、多くのユーザーがいるドキュメントの挿入に応答してアクションをトリガーするイベントドリブン インターフェイスが用意されています。

変更フィードのイベントを管理する作業は、主に、SDK に組み込まれている変更フィード プロセッサ ライブラリによって行われます。 このライブラリは、必要に応じて変更フィードのイベントを複数のワーカー間に配布するのに十分な性能を備えています。 変更フィード ライブラリにコールバックを提供するだけで利用できます。

この Java アプリケーションの簡単な例は、Azure Cosmos DB と変更フィード プロセッサを使用したリアルタイム データ処理を示しています。 アプリケーションは、データ ストリームをシミュレートするために、サンプル ドキュメントを "フィード コンテナー" に挿入します。 フィード コンテナーにバインドされた変更フィード プロセッサは、受信した変更を処理し、ドキュメント コンテンツをログに記録します。 プロセッサは並列処理のリースを自動的に管理します。

ソース コード

SDK サンプル リポジトリを複製し、SampleChangeFeedProcessor.java でこの例を見つけることができます:

git clone https://github.com/Azure-Samples/azure-cosmos-java-sql-api-samples.git
cd azure-cosmos-java-sql-api-sample/src/main/java/com/azure/cosmos/examples/changefeed/

チュートリアル

  1. Azure Cosmos DB と Azure Cosmos DB Java SDK V4 を使用して Java アプリケーションで ChangeFeedProcessorOptions を構成します。 ChangeFeedProcessorOptions は、データ処理中の変更フィード プロセッサの動作を制御するための重要な設定を提供します。

    options = new ChangeFeedProcessorOptions();
    options.setStartFromBeginning(false);
    options.setLeasePrefix("myChangeFeedDeploymentUnit");
    options.setFeedPollDelay(Duration.ofSeconds(5));
    options.setFeedPollThroughputControlConfig(throughputControlGroupConfig);
    
  2. ホスト名、フィード コンテナー、リース コンテナー、データ処理ロジックなど、関連する構成を使用して ChangeFeedProcessor を初期化します。 start() メソッドはデータ処理を開始し、フィード コンテナーからの受信データ変更の同時およびリアルタイム処理を可能にします。

    logger.info("Start Change Feed Processor on worker (handles changes asynchronously)");
    ChangeFeedProcessor changeFeedProcessorInstance = new ChangeFeedProcessorBuilder()
        .hostName("SampleHost_1")
        .feedContainer(feedContainer)
        .leaseContainer(leaseContainer)
        .handleChanges(handleChanges())
        .options(options)
        .buildChangeFeedProcessor();
    changeFeedProcessorInstance.start()
                               .subscribeOn(Schedulers.boundedElastic())
                               .subscribe();
    
  3. デリゲートは、handleChanges() メソッドを使用して受信データの変更を処理するように指定します。 メソッドは、変更フィードから受信した JsonNode ドキュメントを処理します。 開発者は、変更フィードによって提供される JsonNode ドキュメントを処理するための 2 つのオプションがあります。 1 つのオプションは、JsonNode の形式でドキュメントを操作することです。 これは、すべてのドキュメントに対して 1 つの統一されたデータ モデルがない場合に特に便利です。 2 番目のオプション - JsonNode を JsonNode と同じ構造の POJO に変換します。 その後、POJO を操作できます。

    private static Consumer<List<JsonNode>> handleChanges() {
        return (List<JsonNode> docs) -> {
            logger.info("Start handleChanges()");
    
            for (JsonNode document : docs) {
                try {
                    //Change Feed hands the document to you in the form of a JsonNode
                    //As a developer you have two options for handling the JsonNode document provided to you by Change Feed
                    //One option is to operate on the document in the form of a JsonNode, as shown below. This is great
                    //especially if you do not have a single uniform data model for all documents.
                    logger.info("Document received: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
                            .writeValueAsString(document));
    
                    //You can also transform the JsonNode to a POJO having the same structure as the JsonNode,
                    //as shown below. Then you can operate on the POJO.
                    CustomPOJO2 pojo_doc = OBJECT_MAPPER.treeToValue(document, CustomPOJO2.class);
                    logger.info("id: " + pojo_doc.getId());
    
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
            isWorkCompleted = true;
            logger.info("End handleChanges()");
    
        };
    }
    
  4. Java アプリケーションをビルドして実行します。 アプリケーションが変更フィード プロセッサを起動し、フィード コンテナーにサンプル ドキュメントを挿入し、受信した変更を処理します。

まとめ

このガイドでは、Azure Cosmos DB for NoSQL データベースを使用し、リアルタイムデータ処理に変更フィード プロセッサを使用する Azure Cosmos DB Java SDK V4 を使用して Java アプリケーションを作成する方法について説明しました。 このアプリケーションを拡張して、より複雑なユース ケースを処理し、Azure Cosmos DB を使用して堅牢でスケーラブルでグローバルに分散されたアプリケーションを構築できます。

その他の技術情報

次のステップ

以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます: