Spring Cloud support Azure pour Spring Cloud Stream
Cet article s’applique à : ✔️ Version 4.14.0 ✔️ Version 5.8.0
Spring Cloud Stream est un framework permettant de créer des microservices hautement évolutifs basés sur des événements connectés à des systèmes de messagerie partagés.
Le framework fournit un modèle de programmation flexible basé sur des idiomes spring déjà établis et familiers et des meilleures pratiques. Ces meilleures pratiques incluent la prise en charge de la sémantique pub/sous-sémantique persistante, des groupes de consommateurs et des partitions avec état.
Les implémentations de classeur actuelles sont les suivantes :
spring-cloud-azure-stream-binder-eventhubs
- Pour plus d’informations, consultez Spring Cloud Stream Binder pour Azure Event Hubsspring-cloud-azure-stream-binder-servicebus
- pour plus d’informations, consultez Spring Cloud Stream Binder pour Azure Service Bus
Spring Cloud Stream Binder pour Azure Event Hubs
Concepts clés
Spring Cloud Stream Binder pour Azure Event Hubs fournit l’implémentation de liaison pour l’infrastructure Spring Cloud Stream. Cette implémentation utilise les adaptateurs de canal Spring Integration Event Hubs à sa base. Du point de vue de la conception, Event Hubs est similaire à Kafka. En outre, Event Hubs est accessible via l’API Kafka. Si votre projet a une dépendance étroite avec l’API Kafka, vous pouvez essayer Event Hub avec l’exemple d’API Kafka
Groupe de consommateurs
Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages à l’intérieur de Stockage Azure.
Prise en charge du partitionnement
Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement au rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer quel consommateur possède la partition. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions des consommateurs les plus chargés pour atteindre l’équilibre de charge de travail.
Pour spécifier la stratégie d’équilibrage de charge, les propriétés de celles-ci spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.*
sont fournies. Pour plus d’informations, consultez la section Propriétés du consommateur.
Prise en charge des consommateurs Batch
Le classeur Spring Cloud Azure Stream Event Hubs prend en charge la fonctionnalité consommateur Spring Cloud Stream Batch.
Pour utiliser le mode batch-consumer, définissez la spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
propriété true
sur . Lorsqu’il est activé, un message avec une charge utile d’une liste d’événements par lots est reçu et transmis à la Consumer
fonction. Chaque en-tête de message est également converti en liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Les en-têtes communaux de l’ID de partition, case activée pointer et les dernières propriétés mises en file d’attente sont présentés sous forme de valeur unique, car l’ensemble du lot d’événements partage la même valeur. Pour plus d’informations, consultez la section En-têtes de message Event Hubs de Spring Cloud support Azure pour Spring Integration.
Remarque
L’en-tête case activée point existe uniquement lorsque le MANUAL
mode case activée point est utilisé.
Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH
et MANUAL
. BATCH
il s’agit d’un mode de case activée point automatique pour case activée pointer l’ensemble du lot d’événements une fois que le classeur les reçoit. MANUAL
le mode consiste à case activée pointer les événements par les utilisateurs. Lorsqu’il est utilisé, le Checkpointer
message est passé dans l’en-tête de message et les utilisateurs peuvent l’utiliser pour effectuer case activée pointage.
Vous pouvez spécifier la taille du lot en définissant les max-size
propriétés qui max-wait-time
ont un préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.
. La max-size
propriété est nécessaire et la max-wait-time
propriété est facultative. Pour plus d’informations, consultez la section Propriétés du consommateur.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>
Vous pouvez également utiliser Spring Cloud Azure Stream Event Hubs Starter, comme illustré dans l’exemple suivant pour Maven :
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>
Configuration
Le classeur fournit les trois parties suivantes des options de configuration :
propriétés de configuration de Connecter ion
Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.
Remarque
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
propriétés configurables Connecter ion de spring-cloud-azure-stream-binder-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booléen | Indique si azure Event Hubs est activé. |
spring.cloud.azure.eventhubs.connection-string | Chaîne | Valeur chaîne de connexion de l’espace de noms Event Hubs. |
spring.cloud.azure.eventhubs.namespace | Chaîne | Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Chaîne | Nom de domaine d’une valeur d’espace de noms Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Chaîne | Adresse de point de terminaison personnalisée. |
Conseil
Les options de configuration courantes du Kit de développement logiciel (SDK) Du service Azure Service sont également configurables pour le classeur Spring Cloud Azure Stream Event Hubs. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure.
unifié ou le préfixe de spring.cloud.azure.eventhubs.
.
Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération des chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data
des rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.
Propriétés de configuration de point de contrôle
Cette section contient les options de configuration du service Stockage Blobs, qui est utilisé pour conserver la propriété de partition et les informations case activée point.
Remarque
À partir de la version 4.0.0, lorsque la propriété spring.cloud.azure.eventhubs.processor.case activée point-store.create-container-if-not-exists n’est pas activé manuellement, aucun conteneur Stockage ne sera créé automatiquement avec le nom de spring.cloud.stream.bindings.binding-name.destination.
Contrôle des propriétés configurables de spring-cloud-azure-stream-binder-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.processor. case activée point-store.create-container-if-not-exists | Boolean | Indique s’il faut autoriser la création de conteneurs s’il n’existe pas. |
spring.cloud.azure.eventhubs.processor. case activée point-store.account-name | Chaîne | Nom du compte de stockage. |
spring.cloud.azure.eventhubs.processor. case activée point-store.account-key | Chaîne | Clé d’accès au compte de stockage. |
spring.cloud.azure.eventhubs.processor. case activée point-store.container-name | Chaîne | Stockage nom du conteneur. |
Conseil
Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour Stockage magasin d’objets blob case activée point. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure.
unifié ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriétés de configuration de liaison Azure Event Hubs
Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.
Propriétés du consommateur
Ces propriétés sont exposées via EventHubsConsumerProperties
.
Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.mode | CheckpointMode | Mode point de contrôle utilisé lorsque le consommateur décide comment case activée message point |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.count | Entier | Détermine la quantité de message pour chaque partition à effectuer un point de case activée. Prend effet uniquement lorsque PARTITION_COUNT le mode case activée point est utilisé. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.case activéepoint.interval | Durée | Détermine l’intervalle de temps pour effectuer une case activée point. Prend effet uniquement lorsque TIME le mode case activée point est utilisé. |
spring.cloud.stream.eventhubs.bindings.<binding-name.consumer.batch.max-size | Entier | Nombre maximal d’événements dans un lot. Obligatoire pour le mode consommateur par lots. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time | Durée | Durée maximale pour la consommation de lots. Prend effet uniquement lorsque le mode consommateur par lots est activé et est facultatif. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval | Durée | Durée d’intervalle de mise à jour. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy | LoadBalancingStrategy | Stratégie d’équilibrage de charge. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval | Durée | Durée après laquelle la propriété de la partition expire. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties | Boolean | Indique si le processeur d’événements doit demander des informations sur le dernier événement en file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count | Entier | Nombre utilisé par le consommateur pour contrôler le nombre d’événements que le consommateur Event Hub reçoit activement et file d’attente localement. |
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position | Mapper avec la clé en tant qu’ID de partition et valeurs de StartPositionProperties |
Carte contenant la position d’événement à utiliser pour chaque partition si un point de case activée pour la partition n’existe pas dans case activée storepoint. Cette carte est clé hors de l’ID de partition. |
Remarque
La initial-partition-event-position
configuration accepte une map
spécification de la position initiale pour chaque hub d’événements. Par conséquent, sa clé est l’ID de partition, et la valeur comprend StartPositionProperties
les propriétés de décalage, de numéro de séquence, d’heure de date mise en file d’attente et si inclusive. Par exemple, vous pouvez le définir comme
spring:
cloud:
stream:
eventhubs:
bindings:
<binding-name>:
consumer:
initial-partition-event-position:
0:
offset: earliest
1:
sequence-number: 100
2:
enqueued-date-time: 2022-01-12T13:32:47.650005Z
4:
inclusive: false
Configuration avancée du consommateur
La connexion ci-dessus, case activée point et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.
.
Propriétés du producteur
Ces propriétés sont exposées via EventHubsProducerProperties
.
Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync | booléen | Indicateur de commutateur pour la synchronisation du producteur. Si la valeur est true, le producteur attend une réponse après une opération d’envoi. |
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout | long | Durée d’attente d’une réponse après une opération d’envoi. Prend effet uniquement lorsqu’un producteur de synchronisation est activé. |
Configuration avancée du producteur
La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.
.
Utilisation de base
Envoi et réception de messages depuis/vers Event Hubs
Renseignez les options de configuration avec des informations d’identification.
Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity eventhubs: namespace: ${EVENTHUB_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${EVENTHUB_NAME} group: ${CONSUMER_GROUP} supply-out-0: destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE} eventhubs: bindings: consume-in-0: consumer: checkpoint: mode: MANUAL
Définir le fournisseur et le consommateur.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Prise en charge du partitionnement
Une PartitionSupplier
information de partition fournie par l’utilisateur est créée pour configurer les informations de partition sur le message à envoyer. L’organigramme suivant montre le processus d’obtention de priorités différentes pour l’ID de partition et la clé :
Prise en charge des consommateurs Batch
Fournissez les options de configuration par lots, comme indiqué dans l’exemple suivant :
spring: cloud: function: definition: consume stream: bindings: consume-in-0: destination: ${AZURE_EVENTHUB_NAME} group: ${AZURE_EVENTHUB_CONSUMER_GROUP} consumer: batch-mode: true eventhubs: bindings: consume-in-0: consumer: batch: max-batch-size: 10 # Required for batch-consumer mode max-wait-time: 1m # Optional, the default value is null checkpoint: mode: BATCH # or MANUAL as needed
Définir le fournisseur et le consommateur.
Pour case activée mode de pointage, vous
BATCH
pouvez utiliser le code suivant pour envoyer des messages et les consommer par lots.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Pour case activée mode de pointage, vous
MANUAL
pouvez utiliser le code suivant pour envoyer des messages et consommer/case activée point par lots.@Bean public Consumer<Message<List<String>>> consume() { return message -> { for (int i = 0; i < message.getPayload().size(); i++) { LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload().get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i), ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i)); } Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("\"test"+ i++ +"\"").build(); }; }
Remarque
En mode de consommation par lots, le type de contenu par défaut du classeur Spring Cloud Stream est application/json
, donc assurez-vous que la charge utile du message est alignée sur le type de contenu. Par exemple, lorsque vous utilisez le type de contenu par défaut pour application/json
recevoir des messages avec String
une charge utile, la charge utile doit être JSON String
entourée de guillemets doubles pour le texte d’origine String
. Bien que pour text/plain
le type de contenu, il peut s’agir d’un String
objet directement. Pour plus d’informations, consultez Négociation de type de contenu Spring Cloud Stream.
Traiter les messages d’erreur
Gérer les messages d’erreur de liaison sortante
Par défaut, Spring Integration crée un canal d’erreur global appelé
errorChannel
. Configurez le point de terminaison de message suivant pour gérer les messages d’erreur de liaison sortante :@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Gérer les messages d’erreur de liaison entrante
Spring Cloud Stream Event Hubs Binder prend en charge deux solutions pour gérer les erreurs pour les liaisons de messages entrants : les canaux d’erreur personnalisés et les gestionnaires.
Canal d’erreur :
Spring Cloud Stream fournit un canal d’erreur pour chaque liaison entrante. Un
ErrorMessage
message est envoyé au canal d’erreur. Pour plus d’informations, consultez la documentation relative à la gestion des erreurs dans Spring Cloud Stream.Canal d’erreur par défaut
Vous pouvez utiliser un canal d’erreur global nommé
errorChannel
pour consommer tous les messages d’erreur de liaison entrante. Pour gérer ces messages, configurez le point de terminaison de message suivant :@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Canal d’erreur spécifique à la liaison
Vous pouvez utiliser un canal d’erreur spécifique pour utiliser les messages d’erreur de liaison entrant spécifiques avec une priorité supérieure au canal d’erreur par défaut. Pour gérer ces messages, configurez le point de terminaison de message suivant :
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Remarque
Le canal d’erreur spécifique à la liaison s’exclue mutuellement avec d’autres gestionnaires d’erreurs et canaux fournis.
Gestionnaire d’erreurs :
Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant une instance qui accepte
ErrorMessage
desConsumer
instances. Pour plus d’informations, consultez La gestion des erreurs dans la documentation Spring Cloud Stream.Remarque
Quand un gestionnaire d’erreurs de liaison est configuré, il peut fonctionner avec le canal d’erreur par défaut.
Gestionnaire d’erreurs par défaut de liaison
Configurez un seul
Consumer
bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Vous devez également définir la
spring.cloud.stream.default.error-handler-definition
propriété sur le nom de la fonction.Gestionnaire d’erreurs spécifique à la liaison
Configurez un
Consumer
bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique et a une priorité plus élevée que le gestionnaire d’erreurs par défaut de liaison :@Bean public Consumer<ErrorMessage> myErrorHandler() { return message -> { // consume the error message }; }
Vous devez également définir la
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
propriété sur le nom de la fonction.
En-têtes de message Event Hubs
Pour connaître les en-têtes de message de base pris en charge, consultez la section En-têtes de message Event Hubs de Spring Cloud support Azure pour Spring Integration.
Prise en charge de plusieurs classeurs
Connecter ion à plusieurs espaces de noms Event Hubs est également prise en charge à l’aide de plusieurs classeurs. Cet exemple prend un chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge. Vous pouvez définir des propriétés associées dans les paramètres d’environnement de chaque classeur.
Pour utiliser plusieurs classeurs avec Event Hubs, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${EVENTHUB_NAME_01} group: ${CONSUMER_GROUP_01} supply1-out-0: destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE} consume2-in-0: binder: eventhub-2 destination: ${EVENTHUB_NAME_02} group: ${CONSUMER_GROUP_02} supply2-out-0: binder: eventhub-2 destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE} binders: eventhub-1: type: eventhubs default-candidate: true environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_01} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhub-2: type: eventhubs default-candidate: false environment: spring: cloud: azure: eventhubs: connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT_CONTAINER_02} account-name: ${CHECKPOINT_STORAGE_ACCOUNT} account-key: ${CHECKPOINT_ACCESS_KEY} eventhubs: bindings: consume1-in-0: consumer: checkpoint: mode: MANUAL consume2-in-0: consumer: checkpoint: mode: MANUAL poller: initial-delay: 0 fixed-delay: 1000
Remarque
Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.Nous devons définir deux fournisseurs et deux consommateurs :
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message)) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; }
Provisionnement des ressources
Event Hubs binder prend en charge l’approvisionnement d’event hub et de groupe de consommateurs, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
eventhubs:
resource:
resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Exemples
Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.
Spring Cloud Stream Binder pour Azure Service Bus
Concepts clés
Spring Cloud Stream Binder pour Azure Service Bus fournit l’implémentation de liaison pour Spring Cloud Stream Framework. Cette implémentation utilise les adaptateurs de canal Spring Integration Service Bus à sa base.
Message planifié
Ce classeur prend en charge l’envoi de messages à une rubrique pour le traitement différé. Les utilisateurs peuvent envoyer des messages planifiés avec l’en-tête x-delay
exprimant en millisecondes un délai pour le message. Le message est remis aux rubriques respectives après x-delay
millisecondes.
Groupe de consommateurs
Service Bus Topic fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente.
Ce classeur s’appuie sur Subscription
un sujet pour agir en tant que groupe de consommateurs.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
Vous pouvez également utiliser Spring Cloud Azure Stream Service Bus Starter, comme illustré dans l’exemple suivant pour Maven :
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
Configuration
Le classeur fournit les deux parties suivantes des options de configuration :
propriétés de configuration de Connecter ion
Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.
Remarque
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
propriétés configurables Connecter ion de spring-cloud-azure-stream-binder-servicebus :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.servicebus.enabled | booléen | Indique si Azure Service Bus est activé. |
spring.cloud.azure.servicebus.connection-string | Chaîne | Valeur chaîne de connexion espace de noms Service Bus. |
spring.cloud.azure.servicebus.namespace | Chaîne | Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Chaîne | Nom de domaine d’une valeur d’espace de noms Azure Service Bus. |
Remarque
Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service Service Service Sont configurables pour le classeur Spring Cloud Azure Stream Service Bus. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure.
unifié ou le préfixe de spring.cloud.azure.servicebus.
.
Le classeur prend également en charge Spring Could Azure Resource Manager par défaut. Pour en savoir plus sur la récupération des chaîne de connexion avec des principaux de sécurité qui ne sont pas accordés avec Data
des rôles associés, consultez la section Utilisation de base de Spring Could Azure Resource Manager.
Propriétés de configuration de la liaison Azure Service Bus
Les options suivantes sont divisées en quatre sections : Propriétés du consommateur, Configurations avancées du consommateur, Propriétés du producteur et Configurations de producteur avancées.
Propriétés du consommateur
Ces propriétés sont exposées via ServiceBusConsumerProperties
.
Propriétés configurables par le consommateur de spring-cloud-azure-stream-binder-servicebus :
Propriété | Type | Default | Description |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected | booléen | false | Si les messages ayant échoué sont routés vers la DLQ. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls | Entier | 1 | Nombre maximal de messages simultanés que le client du processeur Service Bus doit traiter. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions | Entier | null | Nombre maximal de sessions simultanées à traiter à tout moment. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled | Boolean | null | Indique si la session est activée. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count | Entier | 0 | Nombre de prérécupérations du client de processeur Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue | Sous-file d’attente | Aucune | Type de la sous-file d’attente à laquelle se connecter. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration | Durée | 5 m | Durée de la poursuite du renouvellement automatique du verrou. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | Mode de réception du client du processeur Service Bus. |
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete | Boolean | true | Indique s’il faut régler automatiquement les messages. Si la valeur est false, un en-tête de message est ajouté pour permettre aux développeurs de Checkpointer régler manuellement les messages. |
Configuration avancée du consommateur
La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation pour chaque consommateur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
.
Propriétés du producteur
Ces propriétés sont exposées via ServiceBusProducerProperties
.
Propriétés configurables par le producteur de spring-cloud-azure-stream-binder-servicebus :
Propriété | Type | Default | Description |
---|---|---|---|
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync | booléen | false | Indicateur de commutateur pour la synchronisation du producteur. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout | long | 10000 | Valeur de délai d’expiration pour l’envoi du producteur. |
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type | ServiceBusEntityType | null | Type d’entité Service Bus du producteur, requis pour le producteur de liaison. |
Important
Lors de l’utilisation du producteur de liaison, la propriété de celle-ci spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type
doit être configurée.
Configuration avancée du producteur
La connexion ci-dessus et la configuration du client du Kit de développement logiciel (SDK) Azure courante prennent en charge la personnalisation de chaque producteur de classeurs, que vous pouvez configurer avec le préfixe spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
.
Utilisation de base
Envoi et réception de messages depuis/vers Service Bus
Renseignez les options de configuration avec des informations d’identification.
Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity servicebus: namespace: ${SERVICEBUS_NAMESPACE} function: definition: consume;supply stream: bindings: consume-in-0: destination: ${SERVICEBUS_ENTITY_NAME} # If you use Service Bus Topic, add the following configuration # group: ${SUBSCRIPTION_NAME} supply-out-0: destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE} servicebus: bindings: consume-in-0: consumer: auto-complete: false supply-out-0: producer: entity-type: queue # set as "topic" if you use Service Bus Topic
Définir le fournisseur et le consommateur.
@Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } @Bean public Supplier<Message<String>> supply() { return () -> { LOGGER.info("Sending message, sequence " + i); return MessageBuilder.withPayload("Hello world, " + i++).build(); }; }
Prise en charge des clés de partition
Le classeur prend en charge le partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.
Spring Cloud Stream fournit une propriété spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
d’expression SpEL de clé de partition. Par exemple, la définition de cette propriété et "'partitionKey-' + headers[<message-header-key>]"
l’ajout d’un en-tête appelé message-header-key. Spring Cloud Stream utilise la valeur de cet en-tête lors de l’évaluation de l’expression pour affecter une clé de partition. Le code suivant fournit un exemple de producteur :
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
};
}
Prise en charge des sessions
Le classeur prend en charge les sessions de message de Service Bus. L’ID de session d’un message peut être défini via l’en-tête du message.
@Bean
public Supplier<Message<String>> generate() {
return () -> {
String value = "random payload";
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build();
};
}
Remarque
Selon le partitionnement Service Bus, l’ID de session a une priorité supérieure à la clé de partition. Par conséquent, lorsque les deux en-têtes ServiceBusMessageHeaders#SESSION_ID
sont ServiceBusMessageHeaders#PARTITION_KEY
définis, la valeur de l’ID de session est finalement utilisée pour remplacer la valeur de la clé de partition.
Traiter les messages d’erreur
Gérer les messages d’erreur de liaison sortante
Par défaut, Spring Integration crée un canal d’erreur global appelé
errorChannel
. Configurez le point de terminaison de message suivant pour gérer le message d’erreur de liaison sortante.@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling outbound binding error: " + message); }
Gérer les messages d’erreur de liaison entrante
Spring Cloud Stream Service Bus Binder prend en charge trois solutions pour gérer les erreurs des liaisons de messages entrantes : le gestionnaire d’erreurs du classeur, les canaux d’erreur personnalisés et les gestionnaires.
Gestionnaire d’erreurs binder :
Le gestionnaire d’erreurs de classeur par défaut gère la liaison entrante. Vous utilisez ce gestionnaire pour envoyer des messages ayant échoué à la file d’attente de lettres mortes lorsqu’il
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
est activé. Sinon, les messages ayant échoué sont abandonnés. À l’exception de la configuration du canal d’erreur spécifique à la liaison, le gestionnaire d’erreurs de classeur prend toujours effet, qu’il existe d’autres gestionnaires d’erreurs personnalisés ou canaux.Canal d’erreur :
Spring Cloud Stream fournit un canal d’erreur pour chaque liaison entrante. Un
ErrorMessage
message est envoyé au canal d’erreur. Pour plus d’informations, consultez la documentation relative à la gestion des erreurs dans Spring Cloud Stream.Canal d’erreur par défaut
Vous pouvez utiliser un canal d’erreur global nommé
errorChannel
pour consommer tous les messages d’erreur de liaison entrante. Pour gérer ces messages, configurez le point de terminaison de message suivant :@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Canal d’erreur spécifique à la liaison
Vous pouvez utiliser un canal d’erreur spécifique pour utiliser les messages d’erreur de liaison entrant spécifiques avec une priorité supérieure au canal d’erreur par défaut. Pour gérer ces messages, configurez le point de terminaison de message suivant :
// Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group @ServiceActivator(inputChannel = "{destination}.{group}.errors") public void handleError(ErrorMessage message) { LOGGER.error("Handling inbound binding error: " + message); }
Remarque
Le canal d’erreur spécifique à la liaison s’exclue mutuellement avec d’autres gestionnaires d’erreurs et canaux fournis.
Gestionnaire d’erreurs :
Spring Cloud Stream expose un mécanisme permettant de fournir un gestionnaire d’erreurs personnalisé en ajoutant une instance qui accepte
ErrorMessage
desConsumer
instances. Pour plus d’informations, consultez La gestion des erreurs dans la documentation Spring Cloud Stream.Remarque
Quand un gestionnaire d’erreurs de liaison est configuré, il peut fonctionner avec le canal d’erreur par défaut et le gestionnaire d’erreurs du classeur.
Gestionnaire d’erreurs par défaut de liaison
Configurez un seul
Consumer
bean pour consommer tous les messages d’erreur de liaison entrante. La fonction par défaut suivante s’abonne à chaque canal d’erreur de liaison entrante :@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Vous devez également définir la
spring.cloud.stream.default.error-handler-definition
propriété sur le nom de la fonction.Gestionnaire d’erreurs spécifique à la liaison
Configurez un
Consumer
bean pour consommer les messages d’erreur de liaison entrant spécifiques. La fonction suivante s’abonne au canal d’erreur de liaison entrant spécifique avec une priorité supérieure au gestionnaire d’erreurs par défaut de liaison.@Bean public Consumer<ErrorMessage> myDefaultHandler() { return message -> { // consume the error message }; }
Vous devez également définir la
spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition
propriété sur le nom de la fonction.
En-têtes de message Service Bus
Pour connaître les en-têtes de message de base pris en charge, consultez la section En-têtes de message Service Bus de Spring Cloud support Azure pour Spring Integration.
Remarque
Lorsque vous définissez la clé de partition, la priorité de l’en-tête de message est supérieure à la propriété Spring Cloud Stream. Prend donc spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
effet uniquement quand aucun des ServiceBusMessageHeaders#SESSION_ID
en-têtes et ServiceBusMessageHeaders#PARTITION_KEY
des en-têtes n’est configuré.
Prise en charge de plusieurs classeurs
Connecter ion à plusieurs espaces de noms Service Bus est également prise en charge à l’aide de plusieurs classeurs. Cet exemple prend chaîne de connexion comme exemple. Les informations d’identification des principaux de service et des identités managées sont également prises en charge, les utilisateurs peuvent définir des propriétés associées dans les paramètres d’environnement de chaque classeur.
Pour utiliser plusieurs classeurs de ServiceBus, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: function: definition: consume1;supply1;consume2;supply2 stream: bindings: consume1-in-0: destination: ${SERVICEBUS_TOPIC_NAME} group: ${SUBSCRIPTION_NAME} supply1-out-0: destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE} consume2-in-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME} supply2-out-0: binder: servicebus-2 destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE} binders: servicebus-1: type: servicebus default-candidate: true environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING} servicebus-2: type: servicebus default-candidate: false environment: spring: cloud: azure: servicebus: connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING} servicebus: bindings: consume1-in-0: consumer: auto-complete: false supply1-out-0: producer: entity-type: topic consume2-in-0: consumer: auto-complete: false supply2-out-0: producer: entity-type: queue poller: initial-delay: 0 fixed-delay: 1000
Remarque
Le fichier d’application précédent montre comment configurer un seul polleur par défaut pour l’application sur toutes les liaisons. Si vous souhaitez configurer l’polleur pour une liaison spécifique, vous pouvez utiliser une configuration telle que
spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000
.nous devons définir deux fournisseurs et deux consommateurs
@Bean public Supplier<Message<String>> supply1() { return () -> { LOGGER.info("Sending message1, sequence1 " + i); return MessageBuilder.withPayload("Hello world1, " + i++).build(); }; } @Bean public Supplier<Message<String>> supply2() { return () -> { LOGGER.info("Sending message2, sequence2 " + j); return MessageBuilder.withPayload("Hello world2, " + j++).build(); }; } @Bean public Consumer<Message<String>> consume1() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message1 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; } @Bean public Consumer<Message<String>> consume2() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message2 received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e -> LOGGER.error("Error found", e)) .block(); }; }
Provisionnement des ressources
Service Bus binder prend en charge l’approvisionnement de files d’attente, de rubriques et d’abonnements, les utilisateurs peuvent utiliser les propriétés suivantes pour activer l’approvisionnement.
spring:
cloud:
azure:
credential:
tenant-id: <tenant>
profile:
subscription-id: ${AZURE_SUBSCRIPTION_ID}
servicebus:
resource:
resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Exemples
Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.