Dela via


Spring Cloud Stream med Azure Service Bus

Den här artikeln visar hur du använder Spring Cloud Stream Binder för att skicka meddelanden till och ta emot meddelanden från Service Bus queues och topics.

Azure tillhandahåller en asynkron meddelandeplattform som kallas Azure Service Bus ("Service Bus") som baseras på standarden Advanced Message Queueing Protocol 1-0 ("AMQP 1.0"). Service Bus kan användas i flera Azure-plattformar som stöds.

Förutsättningar

Kommentar

Om du vill ge ditt konto åtkomst till dina Azure Service Bus-resurser tilldelar Azure Service Bus Data Sender du rollen och Azure Service Bus Data Receiver till det Microsoft Entra-konto som du använder för närvarande. Mer information om hur du beviljar åtkomstroller finns i Tilldela Azure-roller med hjälp av Azure Portal och Autentisera och auktorisera ett program med Microsoft Entra-ID för åtkomst till Azure Service Bus-entiteter.

Viktigt!

Spring Boot version 2.5 eller senare krävs för att slutföra stegen i den här artikeln.

Skicka och ta emot meddelanden från Azure Service Bus

Med en kö eller ett ämne för Azure Service Bus kan du skicka och ta emot meddelanden med Hjälp av Spring Cloud Azure Stream Binder Service Bus.

Om du vill installera Spring Cloud Azure Stream Binder Service Bus-modulen lägger du till följande beroenden i din pom.xml-fil :

  • Spring Cloud Azure Bill of Materials (BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.19.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    Kommentar

    Om du använder Spring Boot 2.x måste du ange spring-cloud-azure-dependencies versionen till 4.19.0. Den här strukturlistan (BOM) bör konfigureras i avsnittet i <dependencyManagement> din pom.xml-fil . Detta säkerställer att alla Spring Cloud Azure-beroenden använder samma version. Mer information om vilken version som används för den här strukturlistan finns i Vilken version av Spring Cloud Azure ska jag använda.

  • Spring Cloud Azure Stream Binder Service Bus-artefakten:

    <dependency>
         <groupId>com.azure.spring</groupId>
         <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
    </dependency>
    

Koda appen

Använd följande steg för att konfigurera ditt program att använda en Service Bus-kö eller ett ämne för att skicka och ta emot meddelanden.

  1. Konfigurera Service Bus-autentiseringsuppgifterna i konfigurationsfilen application.properties.

     spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME}
     spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false
     spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.fixed-delay=60000 
     spring.cloud.stream.poller.initial-delay=0
    

    I följande tabell beskrivs fälten i konfigurationen:

    Fält beskrivning
    spring.cloud.azure.servicebus.namespace Ange det namnområde som du fick i Service Bus från Azure Portal.
    spring.cloud.stream.bindings.consume-in-0.destination Ange Service Bus-kö eller Service Bus-ämne som du använde i den här självstudien.
    spring.cloud.stream.bindings.supply-out-0.destination Ange samma värde som används som indatamål.
    spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete Ange om meddelanden ska regleras automatiskt. Om det anges som falskt läggs ett meddelandehuvud Checkpointer till för att göra det möjligt för utvecklare att reglera meddelanden manuellt.
    spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type Ange entitetstypen för utdatabindningen, kan vara queue eller topic.
    spring.cloud.function.definition Ange vilken funktionell böna som ska bindas till de externa mål som exponeras av bindningarna.
    spring.cloud.stream.poller.fixed-delay Ange en fast fördröjning för standardsökaren i millisekunder. Standardvärdet är 1 000 L. Det rekommenderade värdet är 60000.
    spring.cloud.stream.poller.initial-delay Ange inledande fördröjning för periodiska utlösare. Standardvärdet är 0.
  2. Redigera startklassfilen för att visa följande innehåll.

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class ServiceBusQueueBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(ServiceBusQueueBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}'", message.getPayload());
                checkpointer.success()
                            .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                            .doOnError(e->LOGGER.error("Error found", e))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    Dricks

    I den här självstudien finns det inga autentiseringsåtgärder i konfigurationerna eller koden. Att ansluta till Azure-tjänster kräver dock autentisering. För att slutföra autentiseringen måste du använda Azure Identity. Spring Cloud Azure använder DefaultAzureCredential, som Azure Identity-biblioteket tillhandahåller för att hjälpa dig att få autentiseringsuppgifter utan några kodändringar.

    DefaultAzureCredential stöder flera autentiseringsmetoder och avgör vilken metod som ska användas vid körning. Med den här metoden kan din app använda olika autentiseringsmetoder i olika miljöer (till exempel lokala miljöer och produktionsmiljöer) utan att implementera miljöspecifik kod. Mer information finns i DefaultAzureCredential.

    För att slutföra autentiseringen i lokala utvecklingsmiljöer kan du använda Azure CLI, Visual Studio Code, PowerShell eller andra metoder. Mer information finns i Azure-autentisering i Java-utvecklingsmiljöer. För att slutföra autentiseringen i Azure-värdmiljöer rekommenderar vi att du använder användartilldelad hanterad identitet. Mer information finns i Vad är hanterade identiteter för Azure-resurser?

  3. Starta programmet. Meddelanden som i följande exempel publiceras i programloggen:

    New message received: 'Hello World'
    Message 'Hello World' successfully checkpointed
    

Nästa steg