Partager via


Spring Cloud support Azure pour Spring Integration

Cet article s’applique à : ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Integration Extension pour Azure fournit des adaptateurs Spring Integration pour les différents services fournis par le Kit de développement logiciel (SDK) Azure pour Java. Nous fournissons la prise en charge de Spring Integration pour ces services Azure : Event Hubs, Service Bus, Stockage File d’attente. Voici une liste des adaptateurs pris en charge :

Intégration spring à Azure Event Hubs

Concepts clés

Azure Event Hubs est une plateforme de streaming de Big Data et un service d’ingestion d’événements. Il peut recevoir et traiter des millions d’événements par seconde. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide d’adaptateurs de traitement par lot/stockage ou d’un fournisseur d’analyse en temps réel.

Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs. Ces adaptateurs fournissent un niveau d’abstraction supérieur sur la prise en charge de Spring pour la communication à distance, la messagerie et la planification. Le projet d’extension Spring Integration pour Event Hubs fournit des adaptateurs de canal entrants et sortants et des passerelles pour Azure Event Hubs.

Remarque

Les API de prise en charge rxJava sont supprimées de la version 4.0.0. Pour plus d’informations, consultez Javadoc.

Groupe de consommateurs

Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages à l’intérieur de Stockage Azure.

Prise en charge du partitionnement

Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement à la rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer la partition détenue par le consommateur. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions de la plupart des consommateurs chargés pour atteindre l’équilibrage de charge de travail.

Pour spécifier la stratégie d’équilibrage de charge, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration EventHubsContainerProperties.

Prise en charge des consommateurs Batch

Prend EventHubsInboundChannelAdapter en charge le mode de consommation par lots. Pour l’activer, les utilisateurs peuvent spécifier le mode d’écouteur comme ListenerMode.BATCH lors de la construction d’une EventHubsInboundChannelAdapter instance. Lorsqu’elle est activée, un message dont la charge utile est une liste d’événements par lots seront reçus et transmis au canal en aval. Chaque en-tête de message est également converti en tant que liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Pour les en-têtes communaux de l’ID de partition, case activée pointer et les dernières propriétés mises en file d’attente, ils sont présentés comme une valeur unique pour l’ensemble du lot d’événements partagent le même. Pour plus d’informations, consultez la section En-têtes de message Event Hubs .

Remarque

L’en-tête case activée point existe uniquement lorsque le mode MANUAL case activée point est utilisé.

Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH et MANUAL. BATCHil s’agit d’un mode de case activée point automatique pour case activée pointer l’ensemble du lot d’événements une fois qu’ils sont reçus. MANUALle mode consiste à case activée pointer les événements par les utilisateurs. Lorsqu’il est utilisé, le point de contrôle est passé dans l’en-tête de message, et les utilisateurs peuvent l’utiliser pour effectuer case activée pointage.

La stratégie de consommation de lots peut être spécifiée par les propriétés et max-sizemax-wait-time, où max-size est une propriété nécessaire pendant qu’elle max-wait-time est facultative. Pour spécifier la stratégie de consommation de lots, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration EventHubsContainerProperties.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuration

Ce démarrage fournit les 3 parties suivantes des options de configuration :

Propriétés de configuration de Connecter ion

Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.

Remarque

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.enabled booléen Indique si azure Event Hubs est activé.
spring.cloud.azure.eventhubs.connection-string Chaîne Valeur chaîne de connexion de l’espace de noms Event Hubs.
spring.cloud.azure.eventhubs.namespace Chaîne Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Chaîne Nom de domaine d’une valeur d’espace de noms Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Chaîne Adresse de point de terminaison personnalisée.
spring.cloud.azure.eventhubs.shared-connection Boolean Indique si EventProcessorClient et EventHubProducerAsyncClient sous-jacents utilisent la même connexion. Par défaut, une nouvelle connexion est construite et utilisée pour chaque client Event Hub créé.

Propriétés de configuration de point de contrôle

Cette section contient les options de configuration du service Stockage Blobs, qui est utilisé pour conserver la propriété de partition et les informations case activée point.

Remarque

À partir de la version 4.0.0, lorsque la propriété spring.cloud.azure.eventhubs.processor.case activée point-store.create-container-if-not-exists n’est pas activé manuellement, aucun conteneur Stockage ne sera créé automatiquement.

Contrôle des propriétés configurables de spring-cloud-azure-starter-integration-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.processor. case activée point-store.create-container-if-not-exists Boolean Indique s’il faut autoriser la création de conteneurs s’il n’existe pas.
spring.cloud.azure.eventhubs.processor. case activée point-store.account-name Chaîne Nom du compte de stockage.
spring.cloud.azure.eventhubs.processor. case activée point-store.account-key Chaîne Clé d’accès au compte de stockage.
spring.cloud.azure.eventhubs.processor. case activée point-store.container-name Chaîne Stockage nom du conteneur.

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour Stockage magasin d’objets blob case activée point. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure. unifié ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriétés de configuration du processeur Event Hub

L’utilisation EventHubsInboundChannelAdapter des EventProcessorClient messages à partir d’un hub d’événements permet de configurer les propriétés globales d’un EventProcessorClient, les développeurs peuvent utiliser EventHubsContainerProperties pour la configuration. Consultez la section suivante sur l’utilisation EventHubsInboundChannelAdapterde .

Utilisation de base

Envoyer des messages à Azure Event Hubs

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  1. Créez DefaultMessageHandler avec le EventHubsTemplate bean pour envoyer des messages à Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recevoir des messages d’Azure Event Hubs

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez EventHubsInboundChannelAdapter avec le EventHubsMessageListenerContainer bean pour recevoir des messages d’Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Créez une liaison de récepteur de messages avec EventHubsInboundChannelAdapter via le canal de message créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurer EventHubsMessageConverter pour personnaliser objectMapper

EventHubsMessageConverter est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.

Prise en charge des consommateurs Batch

Pour consommer des messages à partir d’Event Hubs par lots, il est similaire à l’exemple ci-dessus, en plus des utilisateurs doivent définir les options de configuration associées au traitement par lots pour EventHubsInboundChannelAdapter.

Lorsque vous créez EventHubsInboundChannelAdapter, le mode écouteur doit être défini sur BATCH. Lors de la création d’unan, EventHubsMessageListenerContainerdéfinissez le mode case activée point en tant que MANUAL ou BATCH, et les options de traitement par lots peuvent être configurées en fonction des besoins.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

En-têtes de message Event Hubs

Le tableau suivant illustre la façon dont les propriétés de message Event Hubs sont mappées aux en-têtes de message Spring. Pour Azure Event Hubs, le message est appelé en tant que event.

Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode d’écoute d’enregistrement :

Propriétés d’événement Event Hubs Constantes d’en-tête de message Spring Type Description
Heure en file d’attente EventHubsHeaders#ENQUEUED_TIME Instantané Instant, en UTC, de l’heure à laquelle l’événement a été mis en file d’attente dans la partition Event Hub.
Contrepartie EventHubsHeaders#OFFSET Long Décalage de l’événement lorsqu’il a été reçu de la partition Event Hub associée.
Clé de partition AzureHeaders#PARTITION_KEY Chaîne Clé de hachage de partition si elle a été définie lors de la publication initiale de l’événement.
ID de partition AzureHeaders#RAW_PARTITION_ID Chaîne ID de partition du hub d’événements.
Numéro de séquence EventHubsHeaders#SEQUENCE_NUMo ER Long Numéro de séquence affecté à l’événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée.
Dernières propriétés d’événement en file d’attente EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Propriétés du dernier événement mis en file d’attente dans cette partition.
NA AzureHeaders#CHECKPOINTER Point de contrôle En-tête de case activée pointez le message spécifique.

Les utilisateurs peuvent analyser les en-têtes de message pour les informations associées de chaque événement. Pour définir un en-tête de message pour l’événement, tous les en-têtes personnalisés sont placés en tant que propriété d’application d’un événement, où l’en-tête est défini comme clé de propriété. Lorsque les événements sont reçus d’Event Hubs, toutes les propriétés de l’application sont converties en en-tête de message.

Remarque

Les en-têtes de message de la clé de partition, l’heure mise en file d’attente, le décalage et le numéro de séquence ne sont pas pris en charge pour être définis manuellement.

Lorsque le mode consommateur par lots est activé, les en-têtes spécifiques des messages par lots sont répertoriés ci-dessous, qui contient une liste de valeurs de chaque événement Event Hubs unique.

Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode Écouteur Batch :

Propriétés d’événement Event Hubs Constantes d’en-tête de message Spring Batch Type Description
Heure en file d’attente EventHubsHeaders#ENQUEUED_TIME Liste des instantanés Liste de l’instant, au format UTC, de l’heure à laquelle chaque événement a été mis en file d’attente dans la partition Event Hub.
Contrepartie EventHubsHeaders#OFFSET Liste de longs Liste du décalage de chaque événement lorsqu’il a été reçu de la partition Event Hub associée.
Clé de partition AzureHeaders#PARTITION_KEY Liste de chaînes Liste de la clé de hachage de partition si elle a été définie lors de la publication initiale de chaque événement.
Numéro de séquence EventHubsHeaders#SEQUENCE_NUMo ER Liste de longs Liste du numéro de séquence affecté à chaque événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée.
Propriétés système EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Liste des mappages Liste des propriétés système de chaque événement.
Propriétés de l’application EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Liste des mappages Liste des propriétés d’application de chaque événement, où tous les en-têtes de message personnalisés ou propriétés d’événement sont placés.

Remarque

Lors de la publication de messages, tous les en-têtes de lot ci-dessus sont supprimés des messages s’il existe.

Exemples

Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.

Intégration de Spring à Azure Service Bus

Concepts clés

Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs.

Le projet d’extension Spring Integration pour Azure Service Bus fournit des adaptateurs de canal entrants et sortants pour Azure Service Bus.

Remarque

Les API de prise en charge completFuture ont été déconseillées à partir de la version 2.10.0 et sont remplacées par Reactor Core à partir de la version 4.0.0. Pour plus d’informations, consultez Javadoc.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuration

Ce démarrage fournit les 2 parties suivantes des options de configuration :

propriétés de configuration de Connecter ion

Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.

Remarque

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-servicebus :

Propriété Type Description
spring.cloud.azure.servicebus.enabled booléen Indique si Azure Service Bus est activé.
spring.cloud.azure.servicebus.connection-string Chaîne Valeur chaîne de connexion espace de noms Service Bus.
spring.cloud.azure.servicebus.namespace Chaîne Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Chaîne Nom de domaine d’une valeur d’espace de noms Azure Service Bus.

Propriétés de configuration du processeur Service Bus

L’utilisation ServiceBusInboundChannelAdapter des ServiceBusProcessorClient messages permet de configurer les propriétés globales d’un ServiceBusProcessorClient, les développeurs peuvent utiliser ServiceBusContainerProperties pour la configuration. Consultez la section suivante sur l’utilisation ServiceBusInboundChannelAdapterde .

Utilisation de base

Envoyer des messages à Azure Service Bus

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  1. Créez DefaultMessageHandler avec le ServiceBusTemplate bean pour envoyer des messages à Service Bus, définissez le type d’entité pour ServiceBusTemplate. Cet exemple utilise la file d’attente Service Bus comme exemple.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recevoir des messages d’Azure Service Bus

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez ServiceBusInboundChannelAdapter avec le ServiceBusMessageListenerContainer bean pour recevoir des messages vers Service Bus. Cet exemple utilise la file d’attente Service Bus comme exemple.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Créez une liaison de récepteur de messages via ServiceBusInboundChannelAdapter le canal de message que nous avons créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurer ServiceBusMessageConverter pour personnaliser objectMapper

ServiceBusMessageConverter est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.

En-têtes de message Service Bus

Pour certains en-têtes Service Bus qui peuvent être mappés à plusieurs constantes d’en-tête Spring, la priorité des différents en-têtes Spring est répertoriée.

Mappage entre les en-têtes Service Bus et les en-têtes Spring :

En-têtes et propriétés des messages Service Bus Constantes d’en-tête de message Spring Type Configurable Description
Type de contenu MessageHeaders#CONTENT_TYPE Chaîne Oui Descripteur de type de contenu du message RFC2045.
ID de corrélation : ServiceBusMessageHeaders#CORRELATION_ID Chaîne Oui ID de corrélation du message
ID de message ServiceBusMessageHeaders#MESSAGE_ID Chaîne Oui ID de message du message, cet en-tête a une priorité supérieure à MessageHeaders#ID.
ID de message MessageHeaders#ID UUID Oui ID de message du message, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#MESSAGE_ID.
Clé de partition ServiceBusMessageHeaders#PARTITION_KEY Chaîne Oui Clé de partition pour l’envoi du message à une entité partitionnée.
Répondre à MessageHeaders#REPLY_CHANNEL Chaîne Oui Adresse d’une entité à laquelle envoyer des réponses.
Répondre à l’ID de session ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Chaîne Oui Valeur de propriété ReplyToGroupId du message.
Heure de file d’attente planifiée utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Oui Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité supérieure à AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Heure de file d’attente planifiée utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Entier Oui Date et heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID session ServiceBusMessageHeaders#SESSION_ID Chaîne Oui IDentifier de session pour une entité prenant en charge la session.
Durée de vie ServiceBusMessageHeaders#TIME_TO_LIVE Durée Oui Durée avant l’expiration de ce message.
À ServiceBusMessageHeaders#TO Chaîne Oui Adresse « à » du message, réservée à une utilisation ultérieure dans les scénarios de routage et actuellement ignorée par le répartiteur lui-même.
Objet ServiceBusMessageHeaders#SUBJECT Chaîne Oui Objet du message.
Description de l’erreur de lettre morte ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Chaîne Non Description d’un message qui a été lettre morte.
Raison de la lettre morte ServiceBusMessageHeaders#DEAD_LETTER_REASON Chaîne Non La raison pour laquelle un message a été mis en lettres mortes.
Source de lettres mortes ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Chaîne Non Entité dans laquelle le message a été mis en lettres mortes.
Nombre de livraisons ServiceBusMessageHeaders#DELIVERY_COUNT long Non Nombre de fois où ce message a été remis aux clients.
Numéro de séquence mis en file d’attente ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long Non Numéro de séquence mis en file d’attente affecté à un message par Service Bus.
Heure en file d’attente ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Non Date à laquelle ce message a été mis en file d’attente dans Service Bus.
Expire à ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Non Date d’expiration de ce message.
Jeton de verrouillage ServiceBusMessageHeaders#LOCK_TOKEN Chaîne Non Jeton de verrouillage pour le message actuel.
Verrouillé jusqu’à ce que ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Non Date d’expiration du verrou de ce message.
Numéro de séquence ServiceBusMessageHeaders#SEQUENCE_NUMBER long Non Numéro unique affecté à un message par Service Bus.
State ServiceBusMessageHeaders#STATE ServiceBusMessageState Non État du message, qui peut être actif, différé ou planifié.

Prise en charge des clés de partition

Ce démarrage prend en charge le partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.

Recommandé : Utilisez ServiceBusMessageHeaders.PARTITION_KEY comme clé de l’en-tête.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Non recommandé mais actuellement pris en charge :AzureHeaders.PARTITION_KEY comme clé de l’en-tête.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Remarque

Lorsque les deux ServiceBusMessageHeaders.PARTITION_KEY et AzureHeaders.PARTITION_KEY sont définis dans les en-têtes de message, ServiceBusMessageHeaders.PARTITION_KEY il est préférable.

Prise en charge des sessions

Cet exemple montre comment définir manuellement l’ID de session d’un message dans l’application.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Remarque

Lorsque l’élément ServiceBusMessageHeaders.SESSION_ID est défini dans les en-têtes de message et qu’un en-tête différent ServiceBusMessageHeaders.PARTITION_KEY est également défini, la valeur de l’ID de session sera finalement utilisée pour remplacer la valeur de la clé de partition.

Exemples

Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.

Intégration de Spring au stockage File d’attente Azure

Concepts clés

Stockage File d’attente Azure est un service permettant de stocker un grand nombre de messages. Vous accédez aux messages depuis n’importe où dans le monde par le biais d’appels authentifiés à l’aide du protocole HTTP ou HTTPS. La taille maximale d’un message de file d’attente est de 64 Ko. Une file d’attente peut contenir des millions de messages, dans la limite de la capacité totale d’un compte de stockage. Les files d’attente sont couramment utilisées pour créer un backlog de travail à traiter de façon asynchrone.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuration

Ce démarrage fournit les options de configuration suivantes :

propriétés de configuration de Connecter ion

Cette section contient les options de configuration utilisées pour la connexion à Stockage Azure File d’attente.

Remarque

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-storage-queue :

Propriété Type Description
spring.cloud.azure.storage.queue.enabled booléen Indique si une file d’attente Stockage Azure est activée.
spring.cloud.azure.storage.queue.connection-string Chaîne Stockage valeur d’espace de noms de file d’attente chaîne de connexion.
spring.cloud.azure.storage.queue.accountName Chaîne Stockage nom du compte de file d’attente.
spring.cloud.azure.storage.queue.accountKey Chaîne Stockage clé de compte de file d’attente.
spring.cloud.azure.storage.queue.endpoint Chaîne Stockage point de terminaison du service file d’attente.
spring.cloud.azure.storage.queue.sasToken Chaîne Informations d’identification du jeton Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion utilisée lors de l’envoi de requêtes d’API.
spring.cloud.azure.storage.queue.messageEncoding Chaîne Encodage des messages de file d’attente.

Utilisation de base

Envoyer des messages à Stockage Azure File d’attente

  1. Renseignez les options de configuration des informations d’identification.

    • Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  1. Créez DefaultMessageHandler avec le StorageQueueTemplate haricot pour envoyer des messages à Stockage File d’attente.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envoyez des messages à l’aide de la passerelle.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Recevoir des messages de Stockage Azure File d’attente

  1. Renseignez les options de configuration des informations d’identification.

  2. Créez unan de canal de message en tant que canal d’entrée.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Créez StorageQueueMessageSource avec le StorageQueueTemplate bean pour recevoir des messages dans Stockage File d’attente.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Créez une liaison de récepteur de messages avec Stockage QueueMessageSource créée à la dernière étape via le canal de message que nous avons créé précédemment.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Exemples

Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.