Compartilhar via


Suporte do Spring Cloud Azure para o Spring Integration

Este artigo se aplica a:✅ versão 4.19.0 ✅ versão 5.19.0

A Spring Integration Extension para Azure fornece adaptadores do Spring Integration para os vários serviços fornecidos pelo SDK do Azure para Java. Fornecemos suporte ao Spring Integration para esses serviços do Azure: Hubs de Eventos, Barramento de Serviço, Fila de Armazenamento. Veja a seguir uma lista de adaptadores com suporte:

Integração do Spring com os Hubs de Eventos do Azure

Principais conceitos

Os Hubs de Eventos do Azure são uma plataforma de streaming de Big Data e um serviço de ingestão de eventos. Ele pode receber e processar milhões de eventos por segundo. Os dados enviados para um hub de eventos podem ser transformados e armazenados usando qualquer provedor de análise em tempo real ou adaptadores de lote/armazenamento.

O Spring Integration permite mensagens leves em aplicativos baseados em Spring e dá suporte à integração com sistemas externos por meio de adaptadores declarativos. Esses adaptadores fornecem um nível mais alto de abstração sobre o suporte da Spring para comunicação remota, mensagens e agendamento. O projeto de extensão Spring Integration for Event Hubs fornece adaptadores e gateways de canal de entrada e saída para Os Hubs de Eventos do Azure.

Nota

As APIs de suporte do RxJava são removidas da versão 4.0.0. Consulte Javadoc para obter detalhes.

Grupo de consumidores

Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores como o Apache Kafka, mas com uma lógica ligeiramente diferente. Embora o Kafka armazene todos os deslocamentos confirmados no agente, você precisa armazenar deslocamentos de mensagens dos Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.

Suporte ao particionamento

Os Hubs de Eventos fornecem um conceito semelhante de partição física como Kafka. Mas, ao contrário do rebalanceamento automático do Kafka entre consumidores e partições, os Hubs de Eventos fornecem uma espécie de modo preemptivo. A conta de armazenamento atua como uma concessão para determinar qual partição pertence a qual consumidor. Quando um novo consumidor for iniciado, ele tentará roubar algumas partições da maioria dos consumidores carregados para obter o balanceamento de carga de trabalho.

Para especificar a estratégia de balanceamento de carga, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerProperties.

Suporte ao consumidor do lote

O EventHubsInboundChannelAdapter dá suporte ao modo de consumo em lote. Para habilitá-lo, os usuários podem especificar o modo de ouvinte como ListenerMode.BATCH ao construir uma instância de EventHubsInboundChannelAdapter. Quando habilitada, uma mensagem da qual o conteúdo é uma lista de eventos em lote será recebida e passada para o canal downstream. Cada cabeçalho de mensagem também é convertido como uma lista, da qual o conteúdo é o valor de cabeçalho associado analisado de cada evento. Para os cabeçalhos comuns da ID de partição, do ponto de verificação e das últimas propriedades enfileiradas, eles são apresentados como um único valor para todo o lote de eventos que compartilha o mesmo. Para obter mais informações, consulte a seção cabeçalhos de mensagem dos Hubs de Eventos .

Nota

O cabeçalho de ponto de verificação só existe quando modo manual ponto de verificação é usado.

O ponto de verificação do consumidor em lotes dá suporte a dois modos: BATCH e MANUAL. BATCH modo é um modo de ponto de verificação automático para verificar todo o lote de eventos juntos depois que eles são recebidos. MANUAL modo é verificar os eventos pelos usuários. Quando usado, o do Checkpointer será passado para o cabeçalho da mensagem e os usuários poderão usá-lo para fazer o ponto de verificação.

A política de consumo em lote pode ser especificada por propriedades de max-size e max-wait-time, em que max-size é uma propriedade necessária enquanto max-wait-time é opcional. Para especificar a estratégia de consumo em lote, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerProperties.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuração

Este início fornece as seguintes 3 partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão de spring-cloud-azure-starter-integration-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.enabled booleano Se um Hub de Eventos do Azure está habilitado.
spring.cloud.azure.eventhubs.connection-string Corda Valor da cadeia de conexão do Namespace dos Hubs de Eventos.
spring.cloud.azure.eventhubs.namespace Corda Valor do Namespace dos Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Corda Nome de domínio de um valor de namespace dos Hubs de Eventos do Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address Corda Endereço do ponto de extremidade personalizado.
spring.cloud.azure.eventhubs.shared-connection Booleano Se o EventProcessorClient e EventHubProducerAsyncClient subjacentes usam a mesma conexão. Por padrão, uma nova conexão é construída e usada criada para cada cliente do Hub de Eventos criado.

Propriedades de configuração de ponto de verificação

Esta seção contém as opções de configuração para o serviço Blobs de Armazenamento, que é usado para persistir a propriedade da partição e informações de ponto de verificação.

Nota

Na versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente.

Propriedades configuráveis de ponto de verificação do spring-cloud-azure-starter-integration-eventhubs:

Propriedade Tipo Descrição
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Se deseja permitir a criação de contêineres se não existir.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Corda Nome da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Corda Chave de acesso da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Corda Nome do contêiner de armazenamento.

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o repositório de ponto de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas no de configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado spring.cloud.azure. ou o prefixo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriedades de configuração do processador do Hub de Eventos

O EventHubsInboundChannelAdapter usa o EventProcessorClient para consumir mensagens de um hub de eventos, para configurar as propriedades gerais de um EventProcessorClient, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a seção a seguir sobre como trabalhar com EventHubsInboundChannelAdapter.

Uso básico

Enviar mensagens aos Hubs de Eventos do Azure

  1. Preencha as opções de configuração de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_EVENT_HUBS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o bean EventHubsTemplate para enviar mensagens aos Hubs de Eventos.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Crie uma associação de gateway de mensagem com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Receber mensagens dos Hubs de Eventos do Azure

  1. Preencha as opções de configuração de credencial.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie EventHubsInboundChannelAdapter com o bean EventHubsMessageListenerContainer para receber mensagens dos Hubs de Eventos.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Crie uma associação de receptor de mensagem com EventHubsInboundChannelAdapter por meio do canal de mensagem criado antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurar EventHubsMessageConverter para personalizar objectMapper

EventHubsMessageConverter é feito como um bean configurável para permitir que os usuários personalizem ObjectMapper.

Suporte ao consumidor do lote

Para consumir mensagens de Hubs de Eventos em lotes é semelhante ao exemplo acima, além disso, os usuários devem definir as opções de configuração relacionadas ao consumo em lote para EventHubsInboundChannelAdapter.

Ao criar EventHubsInboundChannelAdapter, o modo de ouvinte deve ser definido como BATCH. Ao criar bean de EventHubsMessageListenerContainer, defina o modo de ponto de verificação como MANUAL ou BATCHe as opções de lote podem ser configuradas conforme necessário.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Cabeçalhos de mensagem dos Hubs de Eventos

A tabela a seguir ilustra como as propriedades de mensagem dos Hubs de Eventos são mapeadas para cabeçalhos de mensagem spring. Para os Hubs de Eventos do Azure, a mensagem é chamada como event.

Mapeamento entre as propriedades de mensagem/evento dos Hubs de Eventos e cabeçalhos de mensagem spring no modo ouvinte de registro:

Propriedades de eventos dos Hubs de Eventos Constantes de cabeçalho de mensagem de mola Tipo Descrição
Tempo enfileirado EventHubsHeaders#ENQUEUED_TIME Instante O instantâneo, em UTC, de quando o evento foi enfileirado na partição do Hub de Eventos.
Offset EventHubsHeaders#OFFSET Longas O deslocamento do evento quando ele foi recebido da partição do Hub de Eventos associada.
Chave de partição AzureHeaders#PARTITION_KEY Corda A chave de hash de partição se ela foi definida ao publicar originalmente o evento.
ID da partição AzureHeaders#RAW_PARTITION_ID Corda A ID de partição do Hub de Eventos.
Número da sequência EventHubsHeaders#SEQUENCE_NUMBER Longas O número de sequência atribuído ao evento quando ele foi enfileirado na partição do Hub de Eventos associada.
Propriedades do último evento enfileirado EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties As propriedades do último evento enfileirado nesta partição.
NA AzureHeaders#CHECKPOINTER Ponto de verificação O cabeçalho do ponto de verificação da mensagem específica.

Os usuários podem analisar os cabeçalhos da mensagem para obter as informações relacionadas de cada evento. Para definir um cabeçalho de mensagem para o evento, todos os cabeçalhos personalizados serão colocados como uma propriedade de aplicativo de um evento, em que o cabeçalho é definido como a chave de propriedade. Quando os eventos forem recebidos dos Hubs de Eventos, todas as propriedades do aplicativo serão convertidas no cabeçalho da mensagem.

Nota

Não há suporte para cabeçalhos de mensagem de chave de partição, tempo enfileirado, deslocamento e número de sequência para serem definidos manualmente.

Quando o modo de consumidor em lote está habilitado, os cabeçalhos específicos de mensagens em lote são listados a seguir, que contém uma lista de valores de cada evento dos Hubs de Eventos.

Mapeamento entre as propriedades de mensagem/evento dos Hubs de Eventos e cabeçalhos de mensagem spring no modo ouvinte do lote:

Propriedades de eventos dos Hubs de Eventos Constantes de cabeçalho de mensagem do Lote do Spring Tipo Descrição
Tempo enfileirado EventHubsHeaders#ENQUEUED_TIME Lista de instantâneos Lista do instantâneo, em UTC, de quando cada evento foi enfileirado na partição do Hub de Eventos.
Offset EventHubsHeaders#OFFSET Lista de Long Lista do deslocamento de cada evento quando ele foi recebido da partição do Hub de Eventos associada.
Chave de partição AzureHeaders#PARTITION_KEY Lista de cadeia de caracteres Lista da chave de hash de partição se ela foi definida ao publicar originalmente cada evento.
Número da sequência EventHubsHeaders#SEQUENCE_NUMBER Lista de Long Lista do número de sequência atribuído a cada evento quando ele foi enfileirado na partição do Hub de Eventos associada.
Propriedades do sistema EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista de Mapas Lista das propriedades do sistema de cada evento.
Propriedades do aplicativo EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista de Mapas Lista das propriedades do aplicativo de cada evento, em que todos os cabeçalhos de mensagem personalizados ou propriedades de evento são colocados.

Nota

Ao publicar mensagens, todos os cabeçalhos de lote acima serão removidos das mensagens, se existirem.

Amostras

Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.

Integração do Spring com o Barramento de Serviço do Azure

Principais conceitos

O Spring Integration permite mensagens leves em aplicativos baseados em Spring e dá suporte à integração com sistemas externos por meio de adaptadores declarativos.

O projeto de extensão Spring Integration para Barramento de Serviço do Azure fornece adaptadores de canal de entrada e saída para o Barramento de Serviço do Azure.

Nota

As APIs de suporte do CompletableFuture foram preteridas da versão 2.10.0 e substituídas pelo Reactor Core da versão 4.0.0. Consulte Javadoc para obter detalhes.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuração

Este início fornece as duas seguintes partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão do spring-cloud-azure-starter-integration-servicebus:

Propriedade Tipo Descrição
spring.cloud.azure.servicebus.enabled booleano Se um Barramento de Serviço do Azure está habilitado.
spring.cloud.azure.servicebus.connection-string Corda Valor da cadeia de conexão do Namespace do Barramento de Serviço.
spring.cloud.azure.servicebus.custom-endpoint-address Corda O endereço de ponto de extremidade personalizado a ser usado ao se conectar ao Barramento de Serviço.
spring.cloud.azure.servicebus.namespace Corda Valor do Namespace do Barramento de Serviço, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Corda Nome de domínio de um valor de namespace do Barramento de Serviço do Azure.

Propriedades de configuração do processador do Barramento de Serviço

O ServiceBusInboundChannelAdapter usa o ServiceBusProcessorClient para consumir mensagens, para configurar as propriedades gerais de um ServiceBusProcessorClient, os desenvolvedores podem usar ServiceBusContainerProperties para a configuração. Consulte a seção a seguir sobre como trabalhar com ServiceBusInboundChannelAdapter.

Uso básico

Enviar mensagens para o Barramento de Serviço do Azure

  1. Preencha as opções de configuração de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o bean ServiceBusTemplate para enviar mensagens ao Barramento de Serviço, defina o tipo de entidade para o ServiceBusTemplate. Este exemplo usa a Fila do Barramento de Serviço como exemplo.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Crie uma associação de gateway de mensagem com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Receber mensagens do Barramento de Serviço do Azure

  1. Preencha as opções de configuração de credencial.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie ServiceBusInboundChannelAdapter com o bean ServiceBusMessageListenerContainer para receber mensagens no Barramento de Serviço. Este exemplo usa a Fila do Barramento de Serviço como exemplo.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Crie uma associação de receptor de mensagem com ServiceBusInboundChannelAdapter por meio do canal de mensagem que criamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurar ServiceBusMessageConverter para personalizar objectMapper

ServiceBusMessageConverter é feito como um bean configurável para permitir que os usuários personalizem ObjectMapper.

Cabeçalhos de mensagem do Barramento de Serviço

Para alguns cabeçalhos do Barramento de Serviço que podem ser mapeados para várias constantes de cabeçalho spring, a prioridade de cabeçalhos spring diferentes é listada.

Mapeamento entre cabeçalhos do Barramento de Serviço e Cabeçalhos spring:

Propriedades e cabeçalhos de mensagem do Barramento de Serviço Constantes de cabeçalho de mensagem de mola Tipo Configurável Descrição
Tipo de conteúdo MessageHeaders#CONTENT_TYPE Corda Sim O descritor RFC2045 tipo de conteúdo da mensagem.
ID de correlação ServiceBusMessageHeaders#CORRELATION_ID Corda Sim A ID de correlação da mensagem
ID da mensagem ServiceBusMessageHeaders#MESSAGE_ID Corda Sim A ID da mensagem, esse cabeçalho tem prioridade maior do que MessageHeaders#ID.
ID da mensagem MessageHeaders#ID UUID Sim A ID da mensagem, esse cabeçalho tem prioridade menor do que ServiceBusMessageHeaders#MESSAGE_ID.
Chave de partição ServiceBusMessageHeaders#PARTITION_KEY Corda Sim A chave de partição para enviar a mensagem para uma entidade particionada.
Responder a MessageHeaders#REPLY_CHANNEL Corda Sim O endereço de uma entidade para a qual enviar respostas.
Responder à ID da sessão ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Corda Sim O valor da propriedade ReplyToGroupId da mensagem.
Hora de enfileiramento agendada utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Sim O datetime em que a mensagem deve ser enfileirada no Barramento de Serviço, esse cabeçalho tem prioridade maior do que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Hora de enfileiramento agendada utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Inteiro Sim O datetime em que a mensagem deve ser enfileirada no Barramento de Serviço, esse cabeçalho tem prioridade menor do que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID da sessão ServiceBusMessageHeaders#SESSION_ID Corda Sim O IDentifier de sessão para uma entidade com reconhecimento de sessão.
Tempo de vida útil ServiceBusMessageHeaders#TIME_TO_LIVE Duração Sim A duração do tempo antes que essa mensagem expire.
Para ServiceBusMessageHeaders#TO Corda Sim O endereço "para" da mensagem, reservado para uso futuro em cenários de roteamento e atualmente ignorado pelo próprio agente.
Assunto ServiceBusMessageHeaders#SUBJECT Corda Sim O assunto da mensagem.
Descrição do erro de carta morta ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Corda Não A descrição de uma mensagem que foi morta.
Motivo da carta morta ServiceBusMessageHeaders#DEAD_LETTER_REASON Corda Não A razão pela qual uma mensagem foi enviada como morta.
Fonte de carta morta ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Corda Não A entidade na qual a mensagem foi enviada como morta.
Contagem de entrega ServiceBusMessageHeaders#DELIVERY_COUNT Longas Não O número de vezes que essa mensagem foi entregue aos clientes.
Número da sequência enfileirada ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER Longas Não O número de sequência enfileirado atribuído a uma mensagem pelo Barramento de Serviço.
Tempo enfileirado ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Não O datetime em que essa mensagem foi enfileirada no Barramento de Serviço.
Expira em ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Não O datetime em que essa mensagem expirará.
Token de bloqueio ServiceBusMessageHeaders#LOCK_TOKEN Corda Não O token de bloqueio da mensagem atual.
Bloqueado até ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Não O datetime em que o bloqueio dessa mensagem expira.
Número da sequência ServiceBusMessageHeaders#SEQUENCE_NUMBER Longas Não O número exclusivo atribuído a uma mensagem pelo Barramento de Serviço.
Estado ServiceBusMessageHeaders#STATE ServiceBusMessageState Não O estado da mensagem, que pode ser Ativa, Adiada ou Agendada.

Suporte à chave de partição

Esse início dá suporte de particionamento do Barramento de Serviço, permitindo a configuração da chave de partição e da ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.

Recomendado: Usar ServiceBusMessageHeaders.PARTITION_KEY como a chave do cabeçalho.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Não recomendado, mas com suporte no momento:AzureHeaders.PARTITION_KEY como a chave do cabeçalho.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando ServiceBusMessageHeaders.PARTITION_KEY e AzureHeaders.PARTITION_KEY são definidos nos cabeçalhos da mensagem, ServiceBusMessageHeaders.PARTITION_KEY é preferencial.

Suporte à sessão

Este exemplo demonstra como definir manualmente a ID da sessão de uma mensagem no aplicativo.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando o ServiceBusMessageHeaders.SESSION_ID é definido nos cabeçalhos da mensagem e um cabeçalho ServiceBusMessageHeaders.PARTITION_KEY diferente também é definido, o valor da ID da sessão será eventualmente usado para substituir o valor da chave de partição.

Personalizar propriedades do cliente do Barramento de Serviço

Os desenvolvedores podem usar AzureServiceClientBuilderCustomizer para personalizar as propriedades do Cliente do Barramento de Serviço. O exemplo a seguir personaliza a propriedade sessionIdleTimeout em ServiceBusClientBuilder:

@Bean
public AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() {
    return builder -> builder.sessionIdleTimeout(Duration.ofSeconds(10));
}

Amostras

Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.

Integração do Spring com a Fila de Armazenamento do Azure

Principais conceitos

O Armazenamento de Filas do Azure é um serviço para armazenar um grande número de mensagens. Você acessa mensagens de qualquer lugar do mundo por meio de chamadas autenticadas usando HTTP ou HTTPS. Uma mensagem de fila pode ter até 64 KB de tamanho. Uma fila pode conter milhões de mensagens, até o limite total de capacidade de uma conta de armazenamento. As filas geralmente são usadas para criar um backlog de trabalho para processar de forma assíncrona.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuração

Este início fornece as seguintes opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar à Fila de Armazenamento do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar o acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão do spring-cloud-azure-starter-integration-storage-queue:

Propriedade Tipo Descrição
spring.cloud.azure.storage.queue.enabled booleano Se uma Fila de Armazenamento do Azure está habilitada.
spring.cloud.azure.storage.queue.connection-string Corda Valor da cadeia de conexão namespace da fila de armazenamento.
spring.cloud.azure.storage.queue.accountName Corda Nome da conta da Fila de Armazenamento.
spring.cloud.azure.storage.queue.accountKey Corda Chave da conta da Fila de Armazenamento.
spring.cloud.azure.storage.queue.endpoint Corda Ponto de extremidade do serviço Fila de Armazenamento.
spring.cloud.azure.storage.queue.sasToken Corda Credencial de token Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion que é usado ao fazer solicitações de API.
spring.cloud.azure.storage.queue.messageEncoding Corda Codificação de mensagens de fila.

Uso básico

Enviar mensagens para a Fila de Armazenamento do Azure

  1. Preencha as opções de configuração de credencial.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades em seu arquivo application.yml:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
      

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como entidade de serviço, configure as seguintes propriedades em seu arquivo application.yml:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}
    

Nota

Os valores permitidos para tenant-id são: common, organizations, consumersou a ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado no ponto de extremidade incorreto (contas pessoais e de organização) da Error AADSTS50020 – A conta de usuário do provedor de identidade não existe node locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o bean StorageQueueTemplate para enviar mensagens para a Fila de Armazenamento.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Crie uma associação de gateway de mensagem com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Receber mensagens da Fila de Armazenamento do Azure

  1. Preencha as opções de configuração de credencial.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie StorageQueueMessageSource com o bean StorageQueueTemplate para receber mensagens na Fila de Armazenamento.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Crie uma associação de receptor de mensagem com StorageQueueMessageSource criado na última etapa por meio do canal de mensagem que criamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Amostras

Para obter mais informações, consulte o azure-spring-boot-samples repositório no GitHub.