Partager via


Spring Cloud support Azure pour Spring Cloud Stream

Cet article s’applique à : ✔️ Version 4.14.0 ✔️ Version 5.8.0

Spring Cloud Stream est un framework permettant de créer des microservices hautement évolutifs basés sur des événements connectés à des systèmes de messagerie partagés.

Le framework fournit un modèle de programmation flexible basé sur des idiomes spring déjà établis et familiers et des meilleures pratiques. Ces meilleures pratiques incluent la prise en charge de la sémantique pub/sous-sémantique persistante, des groupes de consommateurs et des partitions avec état.

Les implémentations de classeur actuelles sont les suivantes :

Spring Cloud Stream Binder pour Azure Event Hubs

Concepts clés

Spring Cloud Stream Binder pour Azure Event Hubs fournit l’implémentation de liaison pour l’infrastructure Spring Cloud Stream. Cette implémentation utilise les adaptateurs de canal Spring Integration Event Hubs à sa base. Du point de vue de la conception, Event Hubs est similaire à Kafka. En outre, Event Hubs est accessible via l’API Kafka. Si votre projet a une dépendance étroite avec l’API Kafka, vous pouvez essayer Event Hub avec l’exemple d’API Kafka

Groupe de consommateurs

Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages à l’intérieur de Stockage Azure.

Prise en charge du partitionnement

Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement au rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer quel consommateur possède la partition. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions des consommateurs les plus chargés pour atteindre l’équilibre de charge de travail.

Pour spécifier la stratégie d’équilibrage de charge, les propriétés de celles-ci spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* sont fournies. Pour plus d’informations, consultez la section Propriétés du consommateur.

Prise en charge des consommateurs Batch

Le classeur Spring Cloud Azure Stream Event Hubs prend en charge la fonctionnalité consommateur Spring Cloud Stream Batch.

Pour utiliser le mode batch-consumer, définissez la spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode propriété truesur . Lorsqu’il est activé, un message avec une charge utile d’une liste d’événements par lots est reçu et transmis à la Consumer fonction. Chaque en-tête de message est également converti en liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Les en-têtes communaux de l’ID de partition, case activée pointer et les dernières propriétés mises en file d’attente sont présentés sous forme de valeur unique, car l’ensemble du lot d’événements partage la même valeur. Pour plus d’informations, consultez la section En-têtes de message Event Hubs de Spring Cloud support Azure pour Spring Integration.

Remarque

L’en-tête case activée point existe uniquement lorsque le MANUAL mode case activée point est utilisé.

Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH et MANUAL. BATCHil s’agit d’un mode de case activée point automatique pour case activée pointer l’ensemble du lot d’événements une fois que le classeur les reçoit. MANUALle mode consiste à case activée pointer les événements par les utilisateurs. Lorsqu’il est utilisé, le Checkpointer message est passé dans l’en-tête de message et les utilisateurs peuvent l’utiliser pour effectuer case activée pointage.

Vous pouvez spécifier la taille du lot en définissant les max-size propriétés qui max-wait-time ont un préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.. La max-size propriété est nécessaire et la max-wait-time propriété est facultative. Pour plus d’informations, consultez la section Propriétés du consommateur.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

Vous pouvez également utiliser Spring Cloud Azure Stream Event Hubs Starter, comme illustré dans l’exemple suivant pour Maven :

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

Configuration

Le classeur fournit les trois parties suivantes des options de configuration :

propriétés de configuration de Connecter ion

Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.

Remarque

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

propriétés configurables Connecter ion de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.enabled booléen Indique si azure Event Hubs est activé.
spring.cloud.azure.eventhubs.connection-string Chaîne Valeur chaîne de connexion de l’espace de noms Event Hubs.
spring.cloud.azure.eventhubs.namespace Chaîne Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Chaîne Nom de domaine d’une valeur d’espace de noms Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Chaîne Adresse de point de terminaison personnalisée.

Conseil

Les options de configuration courantes du Kit de développement logiciel (SDK) Du service Azure Service sont également configurables pour le classeur Spring Cloud Azure Stream Event Hubs. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure. unifié ou le préfixe de spring.cloud.azure.eventhubs..

Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération des chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data des rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.

Propriétés de configuration de point de contrôle

Cette section contient les options de configuration du service Stockage Blobs, qui est utilisé pour conserver la propriété de partition et les informations case activée point.

Remarque

À partir de la version 4.0.0, lorsque la propriété spring.cloud.azure.eventhubs.processor.case activée point-store.create-container-if-not-exists n’est pas activé manuellement, aucun conteneur Stockage ne sera créé automatiquement avec le nom de spring.cloud.stream.bindings.binding-name.destination.

Contrôle des propriétés configurables de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.azure.eventhubs.processor. case activée point-store.create-container-if-not-exists Boolean Indique s’il faut autoriser la création de conteneurs s’il n’existe pas.
spring.cloud.azure.eventhubs.processor. case activée point-store.account-name Chaîne Nom du compte de stockage.
spring.cloud.azure.eventhubs.processor. case activée point-store.account-key Chaîne Clé d’accès au compte de stockage.
spring.cloud.azure.eventhubs.processor. case activée point-store.container-name Chaîne Stockage nom du conteneur.

Conseil

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour Stockage magasin d’objets blob case activée point. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure. unifié ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriétés de configuration de liaison Azure Event Hubs

Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.

Propriétés du consommateur

Ces propriétés sont exposées via EventHubsConsumerProperties.

Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.mode CheckpointMode Mode point de contrôle utilisé lorsque le consommateur décide comment case activée message point
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.count Entier Détermine la quantité de message pour chaque partition à effectuer un point de case activée. Prend effet uniquement lorsque PARTITION_COUNT le mode case activée point est utilisé.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.interval Durée Détermine l’intervalle de temps pour effectuer une case activée point. Prend effet uniquement lorsque TIME le mode case activée point est utilisé.
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size Entier Nombre maximal d’événements dans un lot. Obligatoire pour le mode consommateur par lots.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time Durée Durée maximale pour la consommation de lots. Prend effet uniquement lorsque le mode consommateur par lots est activé et est facultatif.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval Durée Durée d’intervalle de mise à jour.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy Stratégie d’équilibrage de charge.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval Durée Durée après laquelle la propriété de la partition expire.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties Boolean Indique si le processeur d’événements doit demander des informations sur le dernier événement en file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Entier Nombre utilisé par le consommateur pour contrôler le nombre d’événements que le consommateur Event Hub reçoit activement et file d’attente localement.
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position Mapper avec la clé en tant qu’ID de partition et valeurs de StartPositionProperties Carte contenant la position d’événement à utiliser pour chaque partition si un point de case activée pour la partition n’existe pas dans case activée storepoint. Cette carte est clé hors de l’ID de partition.

Remarque

La initial-partition-event-position configuration accepte une map spécification de la position initiale pour chaque hub d’événements. Par conséquent, sa clé est l’ID de partition, et la valeur comprend StartPositionProperties les propriétés de décalage, de numéro de séquence, d’heure de date mise en file d’attente et si inclusive. Par exemple, vous pouvez le définir comme

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
Configuration avancée du consommateur

La connexion ci-dessus, case activée point et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer..

Propriétés du producteur

Ces propriétés sont exposées via EventHubsProducerProperties.

Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-eventhubs :

Propriété Type Description
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync booléen Indicateur de commutateur pour la synchronisation du producteur. Si la valeur est true, le producteur attend une réponse après une opération d’envoi.
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long Durée d’attente d’une réponse après une opération d’envoi. Prend effet uniquement lorsqu’un producteur de synchronisation est activé.
Configuration avancée du producteur

La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.producer..

Utilisation de base

Envoi et réception de messages depuis/vers Event Hubs

  1. Renseignez les options de configuration avec des informations d’identification.

    • Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. Définir le fournisseur et le consommateur.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Prise en charge du partitionnement

Une PartitionSupplier information de partition fournie par l’utilisateur est créée pour configurer les informations de partition sur le message à envoyer. L’organigramme suivant montre le processus d’obtention de priorités différentes pour l’ID de partition et la clé :

Diagram showing a flowchart of the partitioning support process.

Prise en charge des consommateurs Batch

  1. Fournissez les options de configuration par lots, comme indiqué dans l’exemple suivant :

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. Définir le fournisseur et le consommateur.

    Pour case activée mode de pointage, vous BATCHpouvez utiliser le code suivant pour envoyer des messages et les consommer par lots.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    Pour case activée mode de pointage, vous MANUALpouvez utiliser le code suivant pour envoyer des messages et consommer/case activée point par lots.

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

Remarque

En mode de consommation par lots, le type de contenu par défaut du classeur Spring Cloud Stream est application/json, donc assurez-vous que la charge utile du message est alignée sur le type de contenu. Par exemple, lorsque vous utilisez le type de contenu par défaut pour application/json recevoir des messages avec String une charge utile, la charge utile doit être JSON Stringentourée de guillemets doubles pour le texte d’origine String . Bien que pour text/plain le type de contenu, il peut s’agir d’un String objet directement. Pour plus d’informations, consultez Négociation de type de contenu Spring Cloud Stream.

Traiter les messages d’erreur

  • Gérer les messages d’erreur de liaison sortante

    Par défaut, Spring Integration crée un canal d’erreur global appelé errorChannel. Configurez le point de terminaison de message suivant pour gérer les messages d’erreur de liaison sortante :

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gérer les messages d’erreur de liaison entrante

    Spring Cloud Stream Event Hubs Binder prend en charge deux solutions pour gérer les erreurs pour les liaisons de messages entrants : les canaux d’erreur personnalisés et les gestionnaires.

    Canal d’erreur :

    Spring Cloud Stream fournit un canal d’erreur pour chaque liaison entrante. Un ErrorMessage message est envoyé au canal d’erreur. Pour plus d’informations, consultez la documentation relative à la gestion des erreurs dans Spring Cloud Stream.

    • Canal d’erreur par défaut

      Vous pouvez utiliser un canal d’erreur global nommé errorChannel pour consommer tous les messages d’erreur de liaison entrante. Pour gérer ces messages, configurez le point de terminaison de message suivant :

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Canal d’erreur spécifique à la liaison

      Vous pouvez utiliser un canal d’erreur spécifique pour utiliser les messages d’erreur de liaison entrant spécifiques avec une priorité supérieure au canal d’erreur par défaut. Pour gérer ces messages, configurez le point de terminaison de message suivant :

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Remarque

      Le canal d’erreur spécifique à la liaison s’exclue mutuellement avec d’autres gestionnaires d’erreurs et canaux fournis.

    Gestionnaire d’erreurs :

    Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant une instance qui accepte ErrorMessage des Consumer instances. Pour plus d’informations, consultez La gestion des erreurs dans la documentation Spring Cloud Stream.

    Remarque

    Quand un gestionnaire d’erreurs de liaison est configuré, il peut fonctionner avec le canal d’erreur par défaut.

    • Gestionnaire d’erreurs par défaut de liaison

      Configurez un seul Consumer bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la spring.cloud.stream.default.error-handler-definition propriété sur le nom de la fonction.

    • Gestionnaire d’erreurs spécifique à la liaison

      Configurez un Consumer bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique et a une priorité plus élevée que le gestionnaire d’erreurs par défaut de liaison :

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition propriété sur le nom de la fonction.

En-têtes de message Event Hubs

Pour connaître les en-têtes de message de base pris en charge, consultez la section En-têtes de message Event Hubs de Spring Cloud support Azure pour Spring Integration.

Prise en charge de plusieurs classeurs

Connecter ion à plusieurs espaces de noms Event Hubs est également prise en charge à l’aide de plusieurs classeurs. Cet exemple prend un chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge. Vous pouvez définir des propriétés associées dans les paramètres d’environnement de chaque classeur.

  1. Pour utiliser plusieurs classeurs avec Event Hubs, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Remarque

    Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. Nous devons définir deux fournisseurs et deux consommateurs :

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

Provisionnement des ressources

Event Hubs binder prend en charge l’approvisionnement d’event hub et de groupe de consommateurs, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

Exemples

Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.

Spring Cloud Stream Binder pour Azure Service Bus

Concepts clés

Spring Cloud Stream Binder pour Azure Service Bus fournit l’implémentation de liaison pour Spring Cloud Stream Framework. Cette implémentation utilise les adaptateurs de canal Spring Integration Service Bus à sa base.

Message planifié

Ce classeur prend en charge l’envoi de messages à une rubrique pour le traitement différé. Les utilisateurs peuvent envoyer des messages planifiés avec l’en-tête x-delay exprimant en millisecondes un délai pour le message. Le message est remis aux rubriques respectives après x-delay millisecondes.

Groupe de consommateurs

Service Bus Topic fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Ce classeur s’appuie sur Subscription un sujet pour agir en tant que groupe de consommateurs.

Configuration des dépendances

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

Vous pouvez également utiliser Spring Cloud Azure Stream Service Bus Starter, comme illustré dans l’exemple suivant pour Maven :

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

Configuration

Le classeur fournit les deux parties suivantes des options de configuration :

propriétés de configuration de Connecter ion

Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.

Remarque

Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.

propriétés configurables Connecter ion de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Description
spring.cloud.azure.servicebus.enabled booléen Indique si Azure Service Bus est activé.
spring.cloud.azure.servicebus.connection-string Chaîne Valeur chaîne de connexion espace de noms Service Bus.
spring.cloud.azure.servicebus.namespace Chaîne Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Chaîne Nom de domaine d’une valeur d’espace de noms Azure Service Bus.

Remarque

Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service Service Service Sont configurables pour le classeur Spring Cloud Azure Stream Service Bus. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure. unifié ou le préfixe de spring.cloud.azure.servicebus..

Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération des chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data des rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.

Propriétés de configuration de la liaison Azure Service Bus

Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.

Propriétés du consommateur

Ces propriétés sont exposées via ServiceBusConsumerProperties.

Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Default Description
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected booléen false Si les messages ayant échoué sont routés vers la DLQ.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Entier 1 Nombre maximal de messages simultanés que le client du processeur Service Bus doit traiter.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions Entier null Nombre maximal de sessions simultanées à traiter à tout moment.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled Boolean null Indique si la session est activée.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count Entier 0 Nombre de prérécupérations du client de processeur Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue Sous-file d’attente Aucune Type de la sous-file d’attente à laquelle se connecter.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration Durée 5 m Durée de la poursuite du renouvellement automatique du verrou.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock Mode de réception du client du processeur Service Bus.
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete Boolean true Indique s’il faut régler automatiquement les messages. Si la valeur est false, un en-tête de message est ajouté pour permettre aux développeurs de Checkpointer régler manuellement les messages.
Configuration avancée du consommateur

La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation pour chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.consumer..

Propriétés du producteur

Ces propriétés sont exposées via ServiceBusProducerProperties.

Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-servicebus :

Propriété Type Default Description
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync booléen false Indicateur de commutateur pour la synchronisation du producteur.
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 Valeur de délai d’expiration pour l’envoi du producteur.
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType null Type d’entité Service Bus du producteur, requis pour le producteur de liaison.

Important

Lors de l’utilisation du producteur de liaison, la propriété de celle-ci spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type doit être configurée.

Configuration avancée du producteur

La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.producer..

Utilisation de base

Envoi et réception de messages depuis/vers Service Bus

  1. Renseignez les options de configuration avec des informations d’identification.

    • Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

  • Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. Définir le fournisseur et le consommateur.

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

Prise en charge des clés de partition

Le classeur prend en charge le partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.

Spring Cloud Stream fournit une propriété spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expressiond’expression SpEL de clé de partition. Par exemple, la définition de cette propriété et "'partitionKey-' + headers[<message-header-key>]" l’ajout d’un en-tête appelé message-header-key. Spring Cloud Stream utilise la valeur de cet en-tête lors de l’évaluation de l’expression pour affecter une clé de partition. Le code suivant fournit un exemple de producteur :

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

Prise en charge des sessions

Le classeur prend en charge les sessions de message de Service Bus. L’ID de session d’un message peut être défini via l’en-tête du message.

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

Remarque

Selon le partitionnement Service Bus, l’ID de session a une priorité supérieure à la clé de partition. Par conséquent, lorsque les deux en-têtes ServiceBusMessageHeaders#SESSION_ID sont ServiceBusMessageHeaders#PARTITION_KEY définis, la valeur de l’ID de session est finalement utilisée pour remplacer la valeur de la clé de partition.

Traiter les messages d’erreur

  • Gérer les messages d’erreur de liaison sortante

    Par défaut, Spring Integration crée un canal d’erreur global appelé errorChannel. Configurez le point de terminaison de message suivant pour gérer le message d’erreur de liaison sortante.

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • Gérer les messages d’erreur de liaison entrante

    Spring Cloud Stream Service Bus Binder prend en charge trois solutions pour gérer les erreurs des liaisons de messages entrantes : le gestionnaire d’erreurs du classeur, les canaux d’erreur personnalisés et les gestionnaires.

    Gestionnaire d’erreurs binder :

    Le gestionnaire d’erreurs de classeur par défaut gère la liaison entrante. Vous utilisez ce gestionnaire pour envoyer des messages ayant échoué à la file d’attente de lettres mortes lorsqu’il spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected est activé. Sinon, les messages ayant échoué sont abandonnés. À l’exception de la configuration du canal d’erreur spécifique à la liaison, le gestionnaire d’erreurs de classeur prend toujours effet, qu’il existe d’autres gestionnaires d’erreurs personnalisés ou canaux.

    Canal d’erreur :

    Spring Cloud Stream fournit un canal d’erreur pour chaque liaison entrante. Un ErrorMessage message est envoyé au canal d’erreur. Pour plus d’informations, consultez la documentation relative à la gestion des erreurs dans Spring Cloud Stream.

    • Canal d’erreur par défaut

      Vous pouvez utiliser un canal d’erreur global nommé errorChannel pour consommer tous les messages d’erreur de liaison entrante. Pour gérer ces messages, configurez le point de terminaison de message suivant :

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • Canal d’erreur spécifique à la liaison

      Vous pouvez utiliser un canal d’erreur spécifique pour utiliser les messages d’erreur de liaison entrant spécifiques avec une priorité supérieure au canal d’erreur par défaut. Pour gérer ces messages, configurez le point de terminaison de message suivant :

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      Remarque

      Le canal d’erreur spécifique à la liaison s’exclue mutuellement avec d’autres gestionnaires d’erreurs et canaux fournis.

    Gestionnaire d’erreurs :

    Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant une instance qui accepte ErrorMessage des Consumer instances. Pour plus d’informations, consultez La gestion des erreurs dans la documentation Spring Cloud Stream.

    Remarque

    Quand un gestionnaire d’erreurs de liaison est configuré, il peut fonctionner avec le canal d’erreur par défaut et le gestionnaire d’erreurs du classeur.

    • Gestionnaire d’erreurs par défaut de liaison

      Configurez un seul Consumer bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la spring.cloud.stream.default.error-handler-definition propriété sur le nom de la fonction.

    • Gestionnaire d’erreurs spécifique à la liaison

      Configurez un Consumer bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique avec une priorité supérieure au gestionnaire d’erreurs par défaut de liaison.

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      Vous devez également définir la spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition propriété sur le nom de la fonction.

En-têtes de message Service Bus

Pour connaître les en-têtes de message de base pris en charge, consultez la section En-têtes de message Service Bus de Spring Cloud support Azure pour Spring Integration.

Remarque

Lorsque vous définissez la clé de partition, la priorité de l’en-tête de message est supérieure à la propriété Spring Cloud Stream. Prend donc spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression effet uniquement quand aucun des ServiceBusMessageHeaders#SESSION_ID en-têtes et ServiceBusMessageHeaders#PARTITION_KEY des en-têtes n’est configuré.

Prise en charge de plusieurs classeurs

Connecter ion à plusieurs espaces de noms Service Bus est également prise en charge à l’aide de plusieurs classeurs. Cet exemple prend chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge, les utilisateurs peuvent définir des propriétés associées dans les paramètres d’environnement de chaque classeur.

  1. Pour utiliser plusieurs classeurs de ServiceBus, configurez les propriétés suivantes dans votre fichier application.yml :

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    Remarque

    Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000.

  2. nous devons définir deux fournisseurs et deux consommateurs

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

Provisionnement des ressources

Service Bus binder prend en charge l’approvisionnement de files d’attente, de rubriques et d’abonnements, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

Remarque

Les valeurs autorisées tenant-id sont : common, organizations, consumersou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.

Exemples

Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.