Spring Cloud Stream med Azure Service Bus
Den här artikeln visar hur du använder Spring Cloud Stream Binder för att skicka meddelanden till och ta emot meddelanden från Service Bus queues
och topics
.
Azure tillhandahåller en asynkron meddelandeplattform som kallas Azure Service Bus ("Service Bus") som baseras på standarden Advanced Message Queueing Protocol 1-0 ("AMQP 1.0"). Service Bus kan användas i flera Azure-plattformar som stöds.
Förutsättningar
En Azure-prenumeration – skapa en kostnadsfritt.
Java Development Kit (JDK) version 8 eller senare.
Apache Maven, version 3.2 eller senare.
cURL eller ett liknande HTTP-verktyg för att testa funktioner.
En kö eller ett ämne för Azure Service Bus. Om du inte har någon skapar du en Service Bus-kö eller skapar ett Service Bus-ämne.
Ett Spring Boot-program. Om du inte har ett skapar du ett Maven-projekt med Spring Initializr. Se till att välja Maven Project och under Beroenden lägger du till Beroenden för Spring Web och Azure Support och väljer sedan Java version 8 eller senare.
Kommentar
Om du vill ge ditt konto åtkomst till dina Azure Service Bus-resurser tilldelar Azure Service Bus Data Sender
du rollen och Azure Service Bus Data Receiver
till det Microsoft Entra-konto som du använder för närvarande. Mer information om hur du beviljar åtkomstroller finns i Tilldela Azure-roller med hjälp av Azure Portal och Autentisera och auktorisera ett program med Microsoft Entra-ID för åtkomst till Azure Service Bus-entiteter.
Viktigt!
Spring Boot version 2.5 eller senare krävs för att slutföra stegen i den här artikeln.
Skicka och ta emot meddelanden från Azure Service Bus
Med en kö eller ett ämne för Azure Service Bus kan du skicka och ta emot meddelanden med Hjälp av Spring Cloud Azure Stream Binder Service Bus.
Om du vill installera Spring Cloud Azure Stream Binder Service Bus-modulen lägger du till följande beroenden i din pom.xml-fil :
Spring Cloud Azure Bill of Materials (BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
Kommentar
Om du använder Spring Boot 2.x måste du ange
spring-cloud-azure-dependencies
versionen till4.19.0
. Den här strukturlistan (BOM) bör konfigureras i avsnittet i<dependencyManagement>
din pom.xml-fil . Detta säkerställer att alla Spring Cloud Azure-beroenden använder samma version. Mer information om vilken version som används för den här strukturlistan finns i Vilken version av Spring Cloud Azure ska jag använda.Spring Cloud Azure Stream Binder Service Bus-artefakten:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
Koda appen
Använd följande steg för att konfigurera ditt program att använda en Service Bus-kö eller ett ämne för att skicka och ta emot meddelanden.
Konfigurera Service Bus-autentiseringsuppgifterna i konfigurationsfilen
application.properties
.spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0
I följande tabell beskrivs fälten i konfigurationen:
Fält beskrivning spring.cloud.azure.servicebus.namespace
Ange det namnområde som du fick i Service Bus från Azure Portal. spring.cloud.stream.bindings.consume-in-0.destination
Ange Service Bus-kö eller Service Bus-ämne som du använde i den här självstudien. spring.cloud.stream.bindings.supply-out-0.destination
Ange samma värde som används som indatamål. spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete
Ange om meddelanden ska regleras automatiskt. Om det anges som falskt läggs ett meddelandehuvud Checkpointer
till för att göra det möjligt för utvecklare att reglera meddelanden manuellt.spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type
Ange entitetstypen för utdatabindningen, kan vara queue
ellertopic
.spring.cloud.function.definition
Ange vilken funktionell böna som ska bindas till de externa mål som exponeras av bindningarna. spring.cloud.stream.poller.fixed-delay
Ange en fast fördröjning för standardsökaren i millisekunder. Standardvärdet är 1 000 L. Det rekommenderade värdet är 60000. spring.cloud.stream.poller.initial-delay
Ange inledande fördröjning för periodiska utlösare. Standardvärdet är 0. Redigera startklassfilen för att visa följande innehåll.
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }
Dricks
I den här självstudien finns det inga autentiseringsåtgärder i konfigurationerna eller koden. Att ansluta till Azure-tjänster kräver dock autentisering. För att slutföra autentiseringen måste du använda Azure Identity. Spring Cloud Azure använder
DefaultAzureCredential
, som Azure Identity-biblioteket tillhandahåller för att hjälpa dig att få autentiseringsuppgifter utan några kodändringar.DefaultAzureCredential
stöder flera autentiseringsmetoder och avgör vilken metod som ska användas vid körning. Med den här metoden kan din app använda olika autentiseringsmetoder i olika miljöer (till exempel lokala miljöer och produktionsmiljöer) utan att implementera miljöspecifik kod. Mer information finns i DefaultAzureCredential.För att slutföra autentiseringen i lokala utvecklingsmiljöer kan du använda Azure CLI, Visual Studio Code, PowerShell eller andra metoder. Mer information finns i Azure-autentisering i Java-utvecklingsmiljöer. För att slutföra autentiseringen i Azure-värdmiljöer rekommenderar vi att du använder användartilldelad hanterad identitet. Mer information finns i Vad är hanterade identiteter för Azure-resurser?
Starta programmet. Meddelanden som i följande exempel publiceras i programloggen:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed