次の方法で共有


Spring Cloud Azure サポート for Spring Integration

この記事の対象: ✔️ バージョン 4.14.0 ✔️ バージョン 5.8.0

Spring Integration Extension for Azure には、Azure SDK for Java によって提供されるさまざまなサービス用の Spring Integration アダプターが用意されています。 Event Hubs、Service Bus、Storage キューの各 Azure サービスに対する Spring Integration のサポートが提供されています。 サポート対象のアダプターの一覧は次のとおりです。

Spring と Azure Event Hubs の統合

主要な概念

Azure Event Hubs は、ビッグ データのストリーミング プラットフォームとなるイベント インジェスト サービスです。 1 秒間に何百万ものイベントを受信して処理することができます。 イベント ハブに送信されたデータは、任意のリアルタイム分析プロバイダーやバッチ処理/ストレージ アダプターを使用して、変換および保存できます。

Spring 統合を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。 それらのアダプターは、リモート処理、メッセージング、スケジュール設定に対して Spring のサポートより高いレベルの抽象化を提供します。 Spring Integration for Event Hubs 拡張機能プロジェクトは、Azure Event Hubs の送受信チャネル アダプターとゲートウェイを提供します。

Note

RxJava サポート API はバージョン 4.0.0 から削除されます。 詳しくは Javadoc をご覧ください。

コンシューマー グループ

Event Hubs では、Apache Kafka と同様のコンシューマー グループのサポートが提供されますが、ロジックは若干異なります。 Kafka はコミットされたすべてのオフセットをブローカーに格納しますが、ユーザーは手動で処理される Event Hubs メッセージのオフセットを格納する必要があります。 Azure Storage 内にこのようなオフセットを格納する関数が、Event Hubs SDK で提供されています。

パーティション分割のサポート

Event Hubs には、Kafka と同様の物理パーティションの概念が用意されています。 ただし、Kafka によるコンシューマーとパーティションの間の自動再調整とは異なり、Event Hubs には一種のプリエンプティブ モードが用意されています。 ストレージ アカウントは、どのパーティションがどのコンシューマーによって所有されているかを判断するためのリースとして機能します。 新しいコンシューマーが起動すると、負荷の高いコンシューマーからいくつかのパーティションを盗んでワークロードの分散を実現しようとします。

負荷分散戦略を指定するため、開発者は構成に EventHubsContainerProperties を使用できます。 EventHubsContainerProperties を構成する方法の例については、後のセクションを参照してください。

Batch コンシューマーのサポート

EventHubsInboundChannelAdapter はバッチ消費モードをサポートしています。 ユーザーがこれを有効にするには、EventHubsInboundChannelAdapter インスタンスを構築するときに、リスナー モードを ListenerMode.BATCH と指定できます。 有効にすると、バッチ処理されたイベントのリストがペイロードである Message が受信されて、ダウンストリーム チャネルに渡されます。 各メッセージ ヘッダーもリストとして変換されます。その内容は各イベントから解析された関連ヘッダー値です。 パーティション ID、チェックポイント、および最後にエンキューされたプロパティの共有ヘッダーの場合、イベントのバッチ全体で同じ値が共有されます。 詳細については、「Event Hubs メッセージ ヘッダー」セクションを参照してください

Note

チェックポイント ヘッダーは、MANUAL チェックポイント モードが使用されている場合にのみ存在します。

バッチ コンシューマーのチェックポイント処理では、BATCHMANUAL の 2 つのモードがサポートされています。 BATCH モードは、受信されたイベントのバッチ全体を一緒にチェックポイント処理する自動チェックポイント モードです。 MANUAL モードでは、ユーザーがイベントをチェックポイント処理します。 使用すると、Checkpointer がメッセージ ヘッダーに渡され、ユーザーはそれを使用してチェックポイント処理を行うことができます。

バッチ消費ポリシーは、プロパティ max-sizemax-wait-time で指定できます。max-size は必須プロパティですが、max-wait-time は省略可能です。 バッチ消費戦略を指定するには、開発者は構成に EventHubsContainerProperties を使用できます。 EventHubsContainerProperties を構成する方法の例については、後のセクションを参照してください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

構成

このスターターでは、構成オプションの次の 3 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Event Hubs への接続に使用される構成オプションが含まれます。

Note

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。

spring-cloud-azure-starter-integration-eventhubs の構成可能な接続プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.eventhubs.enabled boolean Azure Event Hubs が有効になっているかどうか。
spring.cloud.azure.eventhubs.connection-string String Event Hubs 名前空間の接続文字列の値。
spring.cloud.azure.eventhubs.namespace String Event Hubs 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.eventhubs.domain-name String Azure Event Hubs 名前空間の値のドメイン名。
spring.cloud.azure.eventhubs.custom-endpoint-address String カスタム エンドポイント アドレス。
spring.cloud.azure.eventhubs.shared-connection Boolean 基になる EventProcessorClient と EventHubProducerAsyncClient で同じ接続を使用するかどうか。 既定では、作成される Event Hub クライアントごとに、新しい接続が構築されて使用されます。

チェックポイント構成プロパティ

このセクションには、パーティションの所有権とチェックポイントの情報を保持するために使用される、Storage Blob サービスの構成オプションが含まれます。

Note

バージョン 4.0.0 以降、spring.cloud.azure.eventhubs.processor.チェック のプロパティpoint-store.create-container-if-not-exists は手動で有効になっていません。ストレージ コンテナーは自動的に作成されません。

spring-cloud-azure-starter-integration-eventhubs の構成可能なチェックポイント プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean 存在しない場合にコンテナーの作成を許可するかどうか。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name String ストレージ アカウントの名前。
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String ストレージ アカウント アクセス キー。
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String ストレージ コンテナー名。

一般的な Azure Service SDK 構成オプションは、Storage Blob チェックポイント ストアに対しても構成できます。 サポートされている構成オプションは Spring Cloud Azure 構成導入され、統合プレフィックスspring.cloud.azure.またはspring.cloud.azure.eventhubs.processor.checkpoint-storeプレフィックスを使用して構成できます。

イベント ハブ プロセッサの構成プロパティ

EventHubsInboundChannelAdapterEventProcessorClient を使用してイベント ハブからのメッセージを使用し、EventProcessorClient の全体的なプロパティを構成します。開発者は、EventHubsContainerProperties を構成に使用できます。 EventHubsInboundChannelAdapter の使用方法については、後のセクションを参照してください。

基本的な使用方法

Azure Event Hubs にメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  1. Bean を使用してEventHubsTemplate作成DefaultMessageHandlerし、Event Hubs にメッセージを送信します。

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Event Hubs からのメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ チャネルの Bean を作成します。

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Bean を使用してEventHubsMessageListenerContainer作成EventHubsInboundChannelAdapterし、Event Hubs からメッセージを受信します。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 前に作成したメッセージ チャネルを使用して、EventHubsInboundChannelAdapter でメッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

EventHubsMessageConverter を構成して objectMapper をカスタマイズする

ユーザーが ObjectMapper をカスタマイズできるように、EventHubsMessageConverter は構成可能な Bean として作成されます。

Batch コンシューマーのサポート

Event Hubs からのメッセージのバッチでの使用は、上記のサンプルと似ていますが、ユーザーは EventHubsInboundChannelAdapter のバッチ消費関連の構成オプションを設定する必要があります。

EventHubsInboundChannelAdapter を作成するときは、リスナー モードを BATCH に設定する必要があります。 EventHubsMessageListenerContainer の Bean を作成するときは、チェックポイント モードを MANUAL または BATCH として設定し、バッチ オプションを必要に応じて構成できます。

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Event Hubs メッセージ ヘッダー

次の表では、Event Hubs のメッセージ プロパティを Spring メッセージ ヘッダーにマップする方法を示します。 Azure Event Hubs では、メッセージは event として呼び出されます。

レコード リスナー モードでの、Event Hubs のメッセージおよびイベントのプロパティと、Spring のメッセージ ヘッダーのマッピング:

Event Hubs イベントのプロパティ Spring Message ヘッダー定数 説明
エンキューされた時刻 EventHubsHeaders#ENQUEUED_TIME 即時 イベントがイベント ハブ パーティションにエンキューされた日時 (UTC)。
オフセット EventHubsHeaders#OFFSET Long 関連付けられたイベント ハブ パーティションからイベントが受信されたときのオフセット。
パーティション キー AzureHeaders#PARTITION_KEY String 最初にイベントを発行するときに設定された場合は、パーティション ハッシュ キー。
パーティション ID AzureHeaders#RAW_PARTITION_ID String イベント ハブのパーティション ID。
Sequence number EventHubsHeaders#SEQUENCE_NUMBER Long 関連付けられたイベント ハブ パーティションにエンキューされたときにイベントに割り当てられたシーケンス番号。
最後にエンキューされたイベント プロパティ EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties このパーティション内の最後にエンキューされたイベントのプロパティ。
NA AzureHeaders#CHECKPOINTER Checkpointer 特定のメッセージをチェックポイント処理するためのヘッダー。

ユーザーは、各イベントの関連情報のメッセージ ヘッダーを解析できます。 イベントのメッセージ ヘッダーを設定するために、カスタマイズされたすべてのヘッダーがイベントのアプリケーション プロパティとして配置されます。ここで、ヘッダーはプロパティ キーとして設定されます。 Event Hubs からイベントを受信すると、すべてのアプリケーション プロパティがメッセージ ヘッダーに変換されます。

Note

パーティション キー、エンキューされた時刻、オフセット、シーケンス番号のメッセージ ヘッダーは、手動で設定することはできません。

バッチ コンシューマー モードを有効にすると、バッチ処理されたメッセージの特定のヘッダーが次のように表示されます。これには、各 Event Hubs イベントの値の一覧が含まれます。

バッチ リスナー モードでの、Event Hubs のメッセージおよびイベントのプロパティと、Spring のメッセージ ヘッダーのマッピング:

Event Hubs イベントのプロパティ Spring Batch メッセージ ヘッダー定数 説明
エンキューされた時刻 EventHubsHeaders#ENQUEUED_TIME インスタントの一覧 各イベントがイベント ハブ パーティションにエンキューされたときの日時 (UTC) の一覧。
オフセット EventHubsHeaders#OFFSET 長の一覧 関連付けられたイベント ハブ パーティションから受信された各イベントのオフセットの一覧。
パーティション キー AzureHeaders#PARTITION_KEY 文字列の一覧 最初に各イベントを発行するときに設定された場合は、パーティション ハッシュ キーの一覧。
Sequence number EventHubsHeaders#SEQUENCE_NUMBER 長の一覧 関連付けられたイベント ハブ パーティションにエンキューされたときに各イベントに割り当てられたシーケンス番号の一覧。
System properties (システムのプロパティ) EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES マップの一覧 各イベントのシステム プロパティの一覧。
Application properties EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES マップの一覧 カスタマイズされたすべてのメッセージ ヘッダーまたはイベント プロパティが配置される各イベントのアプリケーション プロパティの一覧。

Note

メッセージを発行するとき、上のすべてのバッチ ヘッダーが、メッセージから削除されます (存在する場合)。

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。

Spring と Azure Service Bus の統合

主要な概念

Spring 統合を使用すると、Spring ベースのアプリケーション内で軽量のメッセージングが可能になり、宣言型アダプターを介した外部システムとの統合がサポートされます。

Spring Integration for Azure Service Bus 拡張プロジェクトでは、Azure Service Bus の送受信チャネル アダプターが提供されます。

Note

CompletableFuture サポート API はバージョン 2.10.0 から非推奨になり、バージョン 4.0.0 から Reactor Core に置き換えられています。 詳しくは Javadoc をご覧ください。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

構成

このスターターでは、構成オプションの次の 2 つの部分を提供します。

接続構成プロパティ

このセクションには、Azure Service Bus への接続に使用される構成オプションが含まれます。

Note

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。

spring-cloud-azure-starter-integration-servicebus の構成可能な接続プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.servicebus.enabled boolean Azure Service Bus が有効になっているかどうか。
spring.cloud.azure.servicebus.connection-string String Service Bus 名前空間の接続文字列の値。
spring.cloud.azure.servicebus.namespace String Service Bus 名前空間の値。これは FQDN のプレフィックスです。 FQDN は NamespaceName.DomainName で構成する必要があります
spring.cloud.azure.servicebus.domain-name String Azure Service Bus 名前空間の値のドメイン名。

Service Bus プロセッサの構成プロパティ

ServiceBusInboundChannelAdapterServiceBusProcessorClient を使用してメッセージを消費し、ServiceBusProcessorClient の全体的なプロパティを構成します。開発者は、ServiceBusContainerProperties を構成に使用できます。 ServiceBusInboundChannelAdapter の使用方法については、後のセクションを参照してください。

基本的な使用方法

Azure Service Bus にメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  1. メッセージを ServiceBusTemplate Service Bus に送信する Bean を使用して作成DefaultMessageHandlerし、ServiceBusTemplate のエンティティ型を設定します。 このサンプルでは、例として Service Bus キューを使用します。

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Azure Service Bus からメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ チャネルの Bean を作成します。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Service Bus にメッセージをServiceBusMessageListenerContainer受信する Bean を使用して作成ServiceBusInboundChannelAdapterします。 このサンプルでは、例として Service Bus キューを使用します。

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 前に作成したメッセージ チャネルを ServiceBusInboundChannelAdapter 使用して、メッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

objectMapper をカスタマイズするように ServiceBusMessageConverter を構成する

ServiceBusMessageConverter は、ユーザーがカスタマイズ ObjectMapperできるように構成可能な Bean として作成されます。

Service Bus のメッセージ ヘッダー

複数の Spring ヘッダー定数にマップできる一部の Service Bus ヘッダーについては、異なる Spring ヘッダーの優先順位が一覧表示されます。

Service Bus のヘッダーと Spring のヘッダーの間のマッピング:

Service Bus メッセージのヘッダーとプロパティ Spring メッセージ ヘッダー定数 Type コンフィギュレーション可能 説明
コンテンツ タイプ MessageHeaders#CONTENT_TYPE String はい メッセージの RFC2045 Content-Type 記述子。
関連付け ID ServiceBusMessageHeaders#CORRELATION_ID String はい メッセージの関連付け ID。
メッセージ ID ServiceBusMessageHeaders#MESSAGE_ID String はい メッセージのメッセージ ID。このヘッダーは MessageHeaders#ID より高い優先度です。
メッセージ ID MessageHeaders#ID UUID はい メッセージのメッセージ ID。このヘッダーは ServiceBusMessageHeaders#MESSAGE_ID より低い優先度です。
パーティション キー ServiceBusMessageHeaders#PARTITION_KEY String はい パーティション分割されたエンティティにメッセージを送信するためのパーティション キー。
返信先 MessageHeaders#REPLY_CHANNEL String はい 返信の送信先のエンティティのアドレス。
返信先セッション ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID String はい メッセージの ReplyToGroupId プロパティの値。
スケジュールされたエンキュー日時 (UTC) ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime はい メッセージを Service Bus にエンキューする必要がある日時。このヘッダーは AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE より高い優先度です。
スケジュールされたエンキュー日時 (UTC) AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE 整数型 はい メッセージを Service Bus にエンキューする必要がある日時。このヘッダーは ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME より低い優先度です。
セッション ID ServiceBusMessageHeaders#SESSION_ID String はい セッション対応エンティティのセッション IDentifier。
Time to Live ServiceBusMessageHeaders#TIME_TO_LIVE Duration はい このメッセージの有効期限が切れるまでの時間。
受信先 ServiceBusMessageHeaders#TO String はい メッセージの "送信先" アドレス。ルーティング シナリオでの将来の使用のために予約されており、現在はブローカー自体で無視されます。
情報カテゴリ ServiceBusMessageHeaders#SUBJECT String はい メッセージの件名。
配信不能エラーの説明 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String いいえ 配信不能にされたメッセージの説明。
配信不能の理由 ServiceBusMessageHeaders#DEAD_LETTER_REASON String いいえ メッセージが配信不能になった理由。
配信不能メッセージのソース ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String いいえ メッセージが配信不能になったエンティティ。
配信回数 ServiceBusMessageHeaders#DELIVERY_COUNT long いいえ このメッセージがクライアントに配信された回数。
エンキューされたシーケンス番号 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long いいえ Service Bus によってメッセージに割り当てられるエンキューされたシーケンス番号。
エンキューされた時刻 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime いいえ このメッセージが Service Bus にエンキューされた日時。
有効期限 ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime いいえ このメッセージの有効期限が切れる日時。
ロック トークン ServiceBusMessageHeaders#LOCK_TOKEN String いいえ 現在のメッセージのロック トークン。
ロック期限 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime いいえ このメッセージのロックの有効期限が切れる日時。
Sequence number ServiceBusMessageHeaders#SEQUENCE_NUMBER long いいえ Service Bus によってメッセージに割り当てられる一意の番号。
状態 ServiceBusMessageHeaders#STATE ServiceBusMessageState いいえ メッセージの状態。Active、Deferred、または Scheduled です。

パーティション キーのサポート

このスターターは、メッセージ ヘッダーでパーティション キーとセッション ID を設定できるようにすることで、Service Bus のパーティション分割をサポートします。 このセクションでは、メッセージのパーティション キーを設定する方法について説明します。

推奨: 。ヘッダーのキーとして ServiceBusMessageHeaders.PARTITION_KEY を使用します。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

AzureHeaders.PARTITION_KEY はヘッダーのキーとして推奨されませんが、現在サポートされています。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Note

ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY の両方がメッセージ ヘッダーに設定されている場合は、 ServiceBusMessageHeaders.PARTITION_KEY が優先されます。

セッションのサポート

この例では、アプリケーションでメッセージのセッション ID を手動で設定する方法を示します。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Note

ServiceBusMessageHeaders.SESSION_IDメッセージ ヘッダーに設定され、別ServiceBusMessageHeaders.PARTITION_KEYのヘッダーも設定されている場合は、最終的にセッション ID の値を使用してパーティション キーの値が上書きされます。

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。

Spring と Azure Storage Queue の統合

主要な概念

Azure Queue storage は、多数のメッセージを格納するためのサービスです。 メッセージには、HTTP または HTTPS を使用して、認証された呼び出しを介して世界中のどこからでもアクセスできます。 キュー メッセージの許容される最大サイズは 64 KB です。 キューには、ストレージ アカウントの総容量の上限を超えない限り、数百万のメッセージを含めることができます。 キューは通常、非同期的な処理用に作業のバックログを作成するために使用されます。

依存関係のセットアップ

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

構成

このスターターでは、次の構成オプションを提供します。

接続構成プロパティ

このセクションには、Azure Storage キューへの接続に使用される構成オプションが含まれます。

Note

セキュリティ プリンシパルを使用して、Azure リソースにアクセスするための Microsoft Entra ID による認証と承認を行う場合は、「Microsoft Entra ID によるアクセスの承認」を参照して、Azure リソースにアクセスするための十分なアクセス許可がセキュリティ プリンシパルに付与されていることを確認してください。

spring-cloud-azure-starter-integration-storage-queue の構成可能な接続プロパティ:

プロパティ タイプ 説明
spring.cloud.azure.storage.queue.enabled boolean Azure Storage キューが有効かどうか。
spring.cloud.azure.storage.queue.connection-string String Storage キュー名前空間の接続文字列の値。
spring.cloud.azure.storage.queue.accountName String Storage キューのアカウント名。
spring.cloud.azure.storage.queue.accountKey String Storage キューのアカウント キー。
spring.cloud.azure.storage.queue.endpoint String Storage キュー サービスのエンドポイント。
spring.cloud.azure.storage.queue.sasToken String SAS トークンの資格情報
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion は API 要求を行うときに使用されます。
spring.cloud.azure.storage.queue.messageEncoding String キュー メッセージのエンコード。

基本的な使用方法

Azure Storage キューにメッセージを送信する

  1. 資格情報の構成オプションを入力します。

    • 接続文字列資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 資格情報をマネージド ID として使用するには、application.yml ファイルで次のプロパティを構成します。

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  • サービス プリンシパルとしての資格情報の場合は、application.yml ファイルで次のプロパティを構成します。

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Note

使用できる tenant-id 値は、次のとおりです。 commonorganizationsconsumers、またはテナント ID です。 これらの値の詳細については、「エラー AADSTS50020 - ID プロバイダーのユーザー アカウントがテナントに存在しない」の「間違ったエンドポイント (個人アカウントと組織アカウント) を使用する」セクションを参照してください。 シングルテナント アプリの変換の詳細については、「Microsoft Entra ID でシングルテナント アプリをマルチテナントに変換する」を参照してください

  1. Bean を使用してStorageQueueTemplate作成DefaultMessageHandlerし、ストレージ キューにメッセージを送信します。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. メッセージ チャネルを使用し、上のメッセージ ハンドラーでメッセージ ゲートウェイ バインドを作成します。

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. ゲートウェイを使用してメッセージを送信します。

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Azure Storage キューからメッセージを受信する

  1. 資格情報の構成オプションを入力します。

  2. 入力チャネルとしてメッセージ チャネルの Bean を作成します。

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Bean を使用してStorageQueueTemplate作成StorageQueueMessageSourceし、ストレージ キューにメッセージを受信します。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 前に作成したメッセージ チャネルを使用し、最後のステップで作成した StorageQueueMessageSource でメッセージ レシーバー バインドを作成します。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

サンプル

詳細については、GitHub の azure-spring-boot-samples リポジトリを参照してください。