Spring Cloud support Azure pour Spring Integration
Cet article s’applique à : ✔️ Version 4.14.0 ✔️ Version 5.8.0
Spring Integration Extension pour Azure fournit des adaptateurs Spring Integration pour les différents services fournis par le Kit de développement logiciel (SDK) Azure pour Java. Nous fournissons la prise en charge de Spring Integration pour ces services Azure : Event Hubs, Service Bus, Stockage File d’attente. Voici une liste des adaptateurs pris en charge :
spring-cloud-azure-starter-integration-eventhubs
- Pour plus d’informations, consultez Spring Integration avec Azure Event Hubsspring-cloud-azure-starter-integration-servicebus
- Pour plus d’informations, consultez Spring Integration avec Azure Service Busspring-cloud-azure-starter-integration-storage-queue
- Pour plus d’informations, consultez Spring Integration with Stockage Azure Queue
Intégration spring à Azure Event Hubs
Concepts clés
Azure Event Hubs est une plateforme de streaming de Big Data et un service d’ingestion d’événements. Il peut recevoir et traiter des millions d’événements par seconde. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide d’adaptateurs de traitement par lot/stockage ou d’un fournisseur d’analyse en temps réel.
Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs. Ces adaptateurs fournissent un niveau d’abstraction supérieur sur la prise en charge de Spring pour la communication à distance, la messagerie et la planification. Le projet d’extension Spring Integration pour Event Hubs fournit des adaptateurs de canal entrants et sortants et des passerelles pour Azure Event Hubs.
Remarque
Les API de prise en charge rxJava sont supprimées de la version 4.0.0. Pour plus d’informations, consultez Javadoc.
Groupe de consommateurs
Event Hubs fournit une prise en charge similaire du groupe de consommateurs comme Apache Kafka, mais avec une légère logique différente. Bien que Kafka stocke tous les décalages validés dans le répartiteur, vous devez stocker les décalages des messages Event Hubs traités manuellement. Le Kit de développement logiciel (SDK) Event Hubs fournit la fonction pour stocker ces décalages à l’intérieur de Stockage Azure.
Prise en charge du partitionnement
Event Hubs fournit un concept similaire de partition physique comme Kafka. Mais contrairement à la rééquilibrage automatique de Kafka entre les consommateurs et les partitions, Event Hubs fournit un type de mode préemptif. Le compte de stockage agit en tant que bail pour déterminer la partition détenue par le consommateur. Lorsqu’un nouveau consommateur démarre, il tente de voler certaines partitions de la plupart des consommateurs chargés pour atteindre l’équilibrage de charge de travail.
Pour spécifier la stratégie d’équilibrage de charge, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration EventHubsContainerProperties
.
Prise en charge des consommateurs Batch
Prend EventHubsInboundChannelAdapter
en charge le mode de consommation par lots. Pour l’activer, les utilisateurs peuvent spécifier le mode d’écouteur comme ListenerMode.BATCH
lors de la construction d’une EventHubsInboundChannelAdapter
instance.
Lorsqu’elle est activée, un message dont la charge utile est une liste d’événements par lots seront reçus et transmis au canal en aval. Chaque en-tête de message est également converti en tant que liste, dont le contenu est la valeur d’en-tête associée analysée à partir de chaque événement. Pour les en-têtes communaux de l’ID de partition, case activée pointer et les dernières propriétés mises en file d’attente, ils sont présentés comme une valeur unique pour l’ensemble du lot d’événements partagent le même. Pour plus d’informations, consultez la section En-têtes de message Event Hubs .
Remarque
L’en-tête case activée point existe uniquement lorsque le mode MANUAL case activée point est utilisé.
Le point de contrôle du consommateur de lots prend en charge deux modes : BATCH
et MANUAL
. BATCH
il s’agit d’un mode de case activée point automatique pour case activée pointer l’ensemble du lot d’événements une fois qu’ils sont reçus. MANUAL
le mode consiste à case activée pointer les événements par les utilisateurs. Lorsqu’il est utilisé, le point de contrôle est passé dans l’en-tête de message, et les utilisateurs peuvent l’utiliser pour effectuer case activée pointage.
La stratégie de consommation de lots peut être spécifiée par les propriétés et max-size
max-wait-time
, où max-size
est une propriété nécessaire pendant qu’elle max-wait-time
est facultative.
Pour spécifier la stratégie de consommation de lots, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante pour obtenir un exemple de configuration EventHubsContainerProperties
.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>
Configuration
Ce démarrage fournit les 3 parties suivantes des options de configuration :
Propriétés de configuration de Connecter ion
Cette section contient les options de configuration utilisées pour la connexion à Azure Event Hubs.
Remarque
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.enabled | booléen | Indique si azure Event Hubs est activé. |
spring.cloud.azure.eventhubs.connection-string | Chaîne | Valeur chaîne de connexion de l’espace de noms Event Hubs. |
spring.cloud.azure.eventhubs.namespace | Chaîne | Valeur de l’espace de noms Event Hubs, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.eventhubs.domain-name | Chaîne | Nom de domaine d’une valeur d’espace de noms Azure Event Hubs. |
spring.cloud.azure.eventhubs.custom-endpoint-address | Chaîne | Adresse de point de terminaison personnalisée. |
spring.cloud.azure.eventhubs.shared-connection | Boolean | Indique si EventProcessorClient et EventHubProducerAsyncClient sous-jacents utilisent la même connexion. Par défaut, une nouvelle connexion est construite et utilisée pour chaque client Event Hub créé. |
Propriétés de configuration de point de contrôle
Cette section contient les options de configuration du service Stockage Blobs, qui est utilisé pour conserver la propriété de partition et les informations case activée point.
Remarque
À partir de la version 4.0.0, lorsque la propriété spring.cloud.azure.eventhubs.processor.case activée point-store.create-container-if-not-exists n’est pas activé manuellement, aucun conteneur Stockage ne sera créé automatiquement.
Contrôle des propriétés configurables de spring-cloud-azure-starter-integration-eventhubs :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.eventhubs.processor. case activée point-store.create-container-if-not-exists | Boolean | Indique s’il faut autoriser la création de conteneurs s’il n’existe pas. |
spring.cloud.azure.eventhubs.processor. case activée point-store.account-name | Chaîne | Nom du compte de stockage. |
spring.cloud.azure.eventhubs.processor. case activée point-store.account-key | Chaîne | Clé d’accès au compte de stockage. |
spring.cloud.azure.eventhubs.processor. case activée point-store.container-name | Chaîne | Stockage nom du conteneur. |
Les options de configuration courantes du Kit de développement logiciel (SDK) Azure Service sont également configurables pour Stockage magasin d’objets blob case activée point. Les options de configuration prises en charge sont introduites dans la configuration d’Azure Spring Cloud et peuvent être configurées avec le préfixe spring.cloud.azure.
unifié ou le préfixe de spring.cloud.azure.eventhubs.processor.checkpoint-store
.
Propriétés de configuration du processeur Event Hub
L’utilisation EventHubsInboundChannelAdapter
des EventProcessorClient
messages à partir d’un hub d’événements permet de configurer les propriétés globales d’un EventProcessorClient
, les développeurs peuvent utiliser EventHubsContainerProperties
pour la configuration. Consultez la section suivante sur l’utilisation EventHubsInboundChannelAdapter
de .
Utilisation de base
Envoyer des messages à Azure Event Hubs
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: eventhubs: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_SERVICE_BUS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Créez
DefaultMessageHandler
avec leEventHubsTemplate
bean pour envoyer des messages à Event Hubs.class Demo { private static final String OUTPUT_CHANNEL = "output"; private static final String EVENTHUB_NAME = "eh1"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.error("There was an error sending the message.", ex); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired EventHubOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface EventHubOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recevoir des messages d’Azure Event Hubs
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
@Configuration class Demo { @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
EventHubsInboundChannelAdapter
avec leEventHubsMessageListenerContainer
bean pour recevoir des messages d’Event Hubs.@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; private static final String EVENTHUB_NAME = "eh1"; private static final String CONSUMER_GROUP = "$Default"; @Bean public EventHubsInboundChannelAdapter messageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubsMessageListenerContainer listenerContainer) { EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); return adapter; } @Bean public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) { EventHubsContainerProperties containerProperties = new EventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL)); return new EventHubsMessageListenerContainer(processorFactory, containerProperties); } }
Créez une liaison de récepteur de messages avec EventHubsInboundChannelAdapter via le canal de message créé précédemment.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurer EventHubsMessageConverter pour personnaliser objectMapper
EventHubsMessageConverter
est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper.
Prise en charge des consommateurs Batch
Pour consommer des messages à partir d’Event Hubs par lots, il est similaire à l’exemple ci-dessus, en plus des utilisateurs doivent définir les options de configuration associées au traitement par lots pour EventHubsInboundChannelAdapter
.
Lorsque vous créez EventHubsInboundChannelAdapter
, le mode écouteur doit être défini sur BATCH
. Lors de la création d’unan, EventHubsMessageListenerContainer
définissez le mode case activée point en tant que MANUAL
ou BATCH
, et les options de traitement par lots peuvent être configurées en fonction des besoins.
@Configuration
class Demo {
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsMessageListenerContainer listenerContainer) {
EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
containerProperties.setEventHubName(EVENTHUB_NAME);
containerProperties.setConsumerGroup(CONSUMER_GROUP);
containerProperties.getBatch().setMaxSize(100);
containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
}
}
En-têtes de message Event Hubs
Le tableau suivant illustre la façon dont les propriétés de message Event Hubs sont mappées aux en-têtes de message Spring. Pour Azure Event Hubs, le message est appelé en tant que event
.
Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode d’écoute d’enregistrement :
Propriétés d’événement Event Hubs | Constantes d’en-tête de message Spring | Type | Description |
---|---|---|---|
Heure en file d’attente | EventHubsHeaders#ENQUEUED_TIME | Instantané | Instant, en UTC, de l’heure à laquelle l’événement a été mis en file d’attente dans la partition Event Hub. |
Contrepartie | EventHubsHeaders#OFFSET | Long | Décalage de l’événement lorsqu’il a été reçu de la partition Event Hub associée. |
Clé de partition | AzureHeaders#PARTITION_KEY | Chaîne | Clé de hachage de partition si elle a été définie lors de la publication initiale de l’événement. |
ID de partition | AzureHeaders#RAW_PARTITION_ID | Chaîne | ID de partition du hub d’événements. |
Numéro de séquence | EventHubsHeaders#SEQUENCE_NUMo ER | Long | Numéro de séquence affecté à l’événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée. |
Dernières propriétés d’événement en file d’attente | EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES | LastEnqueuedEventProperties | Propriétés du dernier événement mis en file d’attente dans cette partition. |
NA | AzureHeaders#CHECKPOINTER | Point de contrôle | En-tête de case activée pointez le message spécifique. |
Les utilisateurs peuvent analyser les en-têtes de message pour les informations associées de chaque événement. Pour définir un en-tête de message pour l’événement, tous les en-têtes personnalisés sont placés en tant que propriété d’application d’un événement, où l’en-tête est défini comme clé de propriété. Lorsque les événements sont reçus d’Event Hubs, toutes les propriétés de l’application sont converties en en-tête de message.
Remarque
Les en-têtes de message de la clé de partition, l’heure mise en file d’attente, le décalage et le numéro de séquence ne sont pas pris en charge pour être définis manuellement.
Lorsque le mode consommateur par lots est activé, les en-têtes spécifiques des messages par lots sont répertoriés ci-dessous, qui contient une liste de valeurs de chaque événement Event Hubs unique.
Mappage entre event Hubs Message / Propriétés d’événement et en-têtes de message Spring en mode Écouteur Batch :
Propriétés d’événement Event Hubs | Constantes d’en-tête de message Spring Batch | Type | Description |
---|---|---|---|
Heure en file d’attente | EventHubsHeaders#ENQUEUED_TIME | Liste des instantanés | Liste de l’instant, au format UTC, de l’heure à laquelle chaque événement a été mis en file d’attente dans la partition Event Hub. |
Contrepartie | EventHubsHeaders#OFFSET | Liste de longs | Liste du décalage de chaque événement lorsqu’il a été reçu de la partition Event Hub associée. |
Clé de partition | AzureHeaders#PARTITION_KEY | Liste de chaînes | Liste de la clé de hachage de partition si elle a été définie lors de la publication initiale de chaque événement. |
Numéro de séquence | EventHubsHeaders#SEQUENCE_NUMo ER | Liste de longs | Liste du numéro de séquence affecté à chaque événement lorsqu’il a été mis en file d’attente dans la partition Event Hub associée. |
Propriétés système | EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES | Liste des mappages | Liste des propriétés système de chaque événement. |
Propriétés de l’application | EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES | Liste des mappages | Liste des propriétés d’application de chaque événement, où tous les en-têtes de message personnalisés ou propriétés d’événement sont placés. |
Remarque
Lors de la publication de messages, tous les en-têtes de lot ci-dessus sont supprimés des messages s’il existe.
Exemples
Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.
Intégration de Spring à Azure Service Bus
Concepts clés
Spring Integration permet une messagerie légère dans les applications Spring et prend en charge l’intégration avec des systèmes externes via des adaptateurs déclaratifs.
Le projet d’extension Spring Integration pour Azure Service Bus fournit des adaptateurs de canal entrants et sortants pour Azure Service Bus.
Remarque
Les API de prise en charge completFuture ont été déconseillées à partir de la version 2.10.0 et sont remplacées par Reactor Core à partir de la version 4.0.0. Pour plus d’informations, consultez Javadoc.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>
Configuration
Ce démarrage fournit les 2 parties suivantes des options de configuration :
propriétés de configuration de Connecter ion
Cette section contient les options de configuration utilisées pour la connexion à Azure Service Bus.
Remarque
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-servicebus :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.servicebus.enabled | booléen | Indique si Azure Service Bus est activé. |
spring.cloud.azure.servicebus.connection-string | Chaîne | Valeur chaîne de connexion espace de noms Service Bus. |
spring.cloud.azure.servicebus.namespace | Chaîne | Valeur de l’espace de noms Service Bus, qui est le préfixe du nom de domaine complet. Un nom de domaine complet doit être composé de NamespaceName.DomainName |
spring.cloud.azure.servicebus.domain-name | Chaîne | Nom de domaine d’une valeur d’espace de noms Azure Service Bus. |
Propriétés de configuration du processeur Service Bus
L’utilisation ServiceBusInboundChannelAdapter
des ServiceBusProcessorClient
messages permet de configurer les propriétés globales d’un ServiceBusProcessorClient
, les développeurs peuvent utiliser ServiceBusContainerProperties
pour la configuration. Consultez la section suivante sur l’utilisation ServiceBusInboundChannelAdapter
de .
Utilisation de base
Envoyer des messages à Azure Service Bus
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Créez
DefaultMessageHandler
avec leServiceBusTemplate
bean pour envoyer des messages à Service Bus, définissez le type d’entité pour ServiceBusTemplate. Cet exemple utilise la file d’attente Service Bus comme exemple.class Demo { private static final String OUTPUT_CHANNEL = "queue.output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired QueueOutboundGateway messagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface QueueOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.messagingGateway.send(message); } }
Recevoir des messages d’Azure Service Bus
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
@Configuration class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
ServiceBusInboundChannelAdapter
avec leServiceBusMessageListenerContainer
bean pour recevoir des messages vers Service Bus. Cet exemple utilise la file d’attente Service Bus comme exemple.@Configuration class Demo { private static final String QUEUE_NAME = "queue1"; @Bean public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) { ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); return new ServiceBusMessageListenerContainer(processorFactory, containerProperties); } @Bean public ServiceBusInboundChannelAdapter queueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusMessageListenerContainer listenerContainer) { ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); return adapter; } }
Créez une liaison de récepteur de messages via
ServiceBusInboundChannelAdapter
le canal de message que nous avons créé précédemment.class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }
Configurer ServiceBusMessageConverter pour personnaliser objectMapper
ServiceBusMessageConverter
est fait en tant que bean configurable pour permettre aux utilisateurs de personnaliser ObjectMapper
.
En-têtes de message Service Bus
Pour certains en-têtes Service Bus qui peuvent être mappés à plusieurs constantes d’en-tête Spring, la priorité des différents en-têtes Spring est répertoriée.
Mappage entre les en-têtes Service Bus et les en-têtes Spring :
En-têtes et propriétés des messages Service Bus | Constantes d’en-tête de message Spring | Type | Configurable | Description |
---|---|---|---|---|
Type de contenu | MessageHeaders#CONTENT_TYPE |
Chaîne | Oui | Descripteur de type de contenu du message RFC2045. |
ID de corrélation : | ServiceBusMessageHeaders#CORRELATION_ID |
Chaîne | Oui | ID de corrélation du message |
ID de message | ServiceBusMessageHeaders#MESSAGE_ID |
Chaîne | Oui | ID de message du message, cet en-tête a une priorité supérieure à MessageHeaders#ID . |
ID de message | MessageHeaders#ID |
UUID | Oui | ID de message du message, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#MESSAGE_ID . |
Clé de partition | ServiceBusMessageHeaders#PARTITION_KEY |
Chaîne | Oui | Clé de partition pour l’envoi du message à une entité partitionnée. |
Répondre à | MessageHeaders#REPLY_CHANNEL |
Chaîne | Oui | Adresse d’une entité à laquelle envoyer des réponses. |
Répondre à l’ID de session | ServiceBusMessageHeaders#REPLY_TO_SESSION_ID |
Chaîne | Oui | Valeur de propriété ReplyToGroupId du message. |
Heure de file d’attente planifiée utc | ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME |
OffsetDateTime | Oui | Date/heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité supérieure à AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE . |
Heure de file d’attente planifiée utc | AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE |
Entier | Oui | Date et heure à laquelle le message doit être mis en file d’attente dans Service Bus, cet en-tête a une priorité inférieure à ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME . |
ID session | ServiceBusMessageHeaders#SESSION_ID |
Chaîne | Oui | IDentifier de session pour une entité prenant en charge la session. |
Durée de vie | ServiceBusMessageHeaders#TIME_TO_LIVE |
Durée | Oui | Durée avant l’expiration de ce message. |
À | ServiceBusMessageHeaders#TO |
Chaîne | Oui | Adresse « à » du message, réservée à une utilisation ultérieure dans les scénarios de routage et actuellement ignorée par le répartiteur lui-même. |
Objet | ServiceBusMessageHeaders#SUBJECT |
Chaîne | Oui | Objet du message. |
Description de l’erreur de lettre morte | ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION |
Chaîne | Non | Description d’un message qui a été lettre morte. |
Raison de la lettre morte | ServiceBusMessageHeaders#DEAD_LETTER_REASON |
Chaîne | Non | La raison pour laquelle un message a été mis en lettres mortes. |
Source de lettres mortes | ServiceBusMessageHeaders#DEAD_LETTER_SOURCE |
Chaîne | Non | Entité dans laquelle le message a été mis en lettres mortes. |
Nombre de livraisons | ServiceBusMessageHeaders#DELIVERY_COUNT |
long | Non | Nombre de fois où ce message a été remis aux clients. |
Numéro de séquence mis en file d’attente | ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER |
long | Non | Numéro de séquence mis en file d’attente affecté à un message par Service Bus. |
Heure en file d’attente | ServiceBusMessageHeaders#ENQUEUED_TIME |
OffsetDateTime | Non | Date à laquelle ce message a été mis en file d’attente dans Service Bus. |
Expire à | ServiceBusMessageHeaders#EXPIRES_AT |
OffsetDateTime | Non | Date d’expiration de ce message. |
Jeton de verrouillage | ServiceBusMessageHeaders#LOCK_TOKEN |
Chaîne | Non | Jeton de verrouillage pour le message actuel. |
Verrouillé jusqu’à ce que | ServiceBusMessageHeaders#LOCKED_UNTIL |
OffsetDateTime | Non | Date d’expiration du verrou de ce message. |
Numéro de séquence | ServiceBusMessageHeaders#SEQUENCE_NUMBER |
long | Non | Numéro unique affecté à un message par Service Bus. |
State | ServiceBusMessageHeaders#STATE |
ServiceBusMessageState | Non | État du message, qui peut être actif, différé ou planifié. |
Prise en charge des clés de partition
Ce démarrage prend en charge le partitionnement Service Bus en autorisant la définition de la clé de partition et de l’ID de session dans l’en-tête du message. Cette section explique comment définir la clé de partition pour les messages.
Recommandé : Utilisez ServiceBusMessageHeaders.PARTITION_KEY
comme clé de l’en-tête.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Non recommandé mais actuellement pris en charge :AzureHeaders.PARTITION_KEY
comme clé de l’en-tête.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Remarque
Lorsque les deux ServiceBusMessageHeaders.PARTITION_KEY
et AzureHeaders.PARTITION_KEY
sont définis dans les en-têtes de message, ServiceBusMessageHeaders.PARTITION_KEY
il est préférable.
Prise en charge des sessions
Cet exemple montre comment définir manuellement l’ID de session d’un message dans l’application.
public class SampleController {
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestParam String message) {
LOGGER.info("Going to add message {} to Sinks.Many.", message);
many.emitNext(MessageBuilder.withPayload(message)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
return ResponseEntity.ok("Sent!");
}
}
Remarque
Lorsque l’élément ServiceBusMessageHeaders.SESSION_ID
est défini dans les en-têtes de message et qu’un en-tête différent ServiceBusMessageHeaders.PARTITION_KEY
est également défini, la valeur de l’ID de session sera finalement utilisée pour remplacer la valeur de la clé de partition.
Exemples
Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.
Intégration de Spring au stockage File d’attente Azure
Concepts clés
Stockage File d’attente Azure est un service permettant de stocker un grand nombre de messages. Vous accédez aux messages depuis n’importe où dans le monde par le biais d’appels authentifiés à l’aide du protocole HTTP ou HTTPS. La taille maximale d’un message de file d’attente est de 64 Ko. Une file d’attente peut contenir des millions de messages, dans la limite de la capacité totale d’un compte de stockage. Les files d’attente sont couramment utilisées pour créer un backlog de travail à traiter de façon asynchrone.
Configuration des dépendances
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>
Configuration
Ce démarrage fournit les options de configuration suivantes :
propriétés de configuration de Connecter ion
Cette section contient les options de configuration utilisées pour la connexion à Stockage Azure File d’attente.
Remarque
Si vous choisissez d’utiliser un principal de sécurité pour authentifier et autoriser avec l’ID Microsoft Entra pour accéder à une ressource Azure, consultez Autoriser l’accès avec l’ID Microsoft Entra pour vous assurer que le principal de sécurité a reçu l’autorisation suffisante pour accéder à la ressource Azure.
propriétés configurables Connecter ion de spring-cloud-azure-starter-integration-storage-queue :
Propriété | Type | Description |
---|---|---|
spring.cloud.azure.storage.queue.enabled | booléen | Indique si une file d’attente Stockage Azure est activée. |
spring.cloud.azure.storage.queue.connection-string | Chaîne | Stockage valeur d’espace de noms de file d’attente chaîne de connexion. |
spring.cloud.azure.storage.queue.accountName | Chaîne | Stockage nom du compte de file d’attente. |
spring.cloud.azure.storage.queue.accountKey | Chaîne | Stockage clé de compte de file d’attente. |
spring.cloud.azure.storage.queue.endpoint | Chaîne | Stockage point de terminaison du service file d’attente. |
spring.cloud.azure.storage.queue.sasToken | Chaîne | Informations d’identification du jeton Sas |
spring.cloud.azure.storage.queue.serviceVersion | QueueServiceVersion | QueueServiceVersion utilisée lors de l’envoi de requêtes d’API. |
spring.cloud.azure.storage.queue.messageEncoding | Chaîne | Encodage des messages de file d’attente. |
Utilisation de base
Envoyer des messages à Stockage Azure File d’attente
Renseignez les options de configuration des informations d’identification.
Pour les informations d’identification en tant que chaîne de connexion, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: storage: queue: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
Pour les informations d’identification en tant qu’identités managées, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Pour les informations d’identification en tant que principal de service, configurez les propriétés suivantes dans votre fichier application.yml :
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> storage: queue: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
Remarque
Les valeurs autorisées tenant-id
sont : common
, organizations
, consumers
ou l’ID de locataire. Pour plus d’informations sur ces valeurs, consultez la section Utiliser le point de terminaison incorrect (comptes personnels et d’organisation) de l’erreur AADSTS50020 - Le compte d’utilisateur du fournisseur d’identité n’existe pas dans le locataire. Pour plus d’informations sur la conversion de votre application monolocataire, consultez Convertir une application monolocataire en multilocataire sur l’ID Microsoft Entra.
Créez
DefaultMessageHandler
avec leStorageQueueTemplate
haricot pour envoyer des messages à Stockage File d’attente.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; private static final String OUTPUT_CHANNEL = "output"; @Bean @ServiceActivator(inputChannel = OUTPUT_CHANNEL) public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) { DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(new ListenableFutureCallback<Void>() { @Override public void onSuccess(Void result) { LOGGER.info("Message was sent successfully."); } @Override public void onFailure(Throwable ex) { LOGGER.info("There was an error sending the message."); } }); return handler; } }
Créez une liaison de passerelle de message avec le gestionnaire de messages ci-dessus via un canal de message.
class Demo { @Autowired StorageQueueOutboundGateway storageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) public interface StorageQueueOutboundGateway { void send(String text); } }
Envoyez des messages à l’aide de la passerelle.
class Demo { public void demo() { this.storageQueueOutboundGateway.send(message); } }
Recevoir des messages de Stockage Azure File d’attente
Renseignez les options de configuration des informations d’identification.
Créez unan de canal de message en tant que canal d’entrée.
class Demo { private static final String INPUT_CHANNEL = "input"; @Bean public MessageChannel input() { return new DirectChannel(); } }
Créez
StorageQueueMessageSource
avec leStorageQueueTemplate
bean pour recevoir des messages dans Stockage File d’attente.class Demo { private static final String STORAGE_QUEUE_NAME = "example"; @Bean @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) { return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
Créez une liaison de récepteur de messages avec Stockage QueueMessageSource créée à la dernière étape via le canal de message que nous avons créé précédemment.
class Demo { @ServiceActivator(inputChannel = INPUT_CHANNEL) public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) { String message = new String(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }
Exemples
Pour plus d’informations, consultez le référentiel azure-spring-boot-samples sur GitHub.