Partager via


ServiceBusReceiverAsyncClient Classe

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient

Implémente

public final class ServiceBusReceiverAsyncClient
implements AutoCloseable

Récepteur asynchrone responsable de la réception à partir d’une ServiceBusReceivedMessage Azure Service Bus file d’attente ou d’une rubrique/abonnement.

Les exemples présentés dans ce document utilisent un objet d’informations d’identification nommé DefaultAzureCredential pour l’authentification, ce qui est approprié pour la plupart des scénarios, y compris les environnements de développement et de production locaux. En outre, nous vous recommandons d’utiliser l’identité managée pour l’authentification dans les environnements de production. Vous trouverez plus d’informations sur les différentes méthodes d’authentification et leurs types d’informations d’identification correspondants dans la documentation Azure Identity .

Exemple : création d’un ServiceBusReceiverAsyncClient

L’exemple de code suivant illustre la création du client ServiceBusReceiverAsyncClientasynchrone . fullyQualifiedNamespace est le nom d’hôte de l’espace de noms Service Bus. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure. Les informations d’identification utilisées sont DefaultAzureCredential dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement en cours d’exécution. PEEK_LOCK (mode de réception par défaut) et disableAutoComplete() sont fortement recommandés afin que les utilisateurs aient le contrôle sur le règlement des messages.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .receiver()
     .disableAutoComplete()
     .queueName(queueName)
     .buildAsyncClient();

 // When users are done with the receiver, dispose of the receiver.
 // Clients should be long-lived objects as they require resources
 // and time to establish a connection to the service.
 asyncReceiver.close();

Exemple : Recevoir tous les messages de la ressource Service Bus

Cela retourne un flux infini de messages à partir de Service Bus. Le flux se termine lorsque l’abonnement est supprimé ou d’autres scénarios de terminal. Consultez la rubrique receiveMessages() (éventuellement en anglais) pour plus d'informations.

// Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 // Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
 Disposable subscription = asyncReceiver.receiveMessages()
     .flatMap(message -> {
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());

         // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return asyncReceiver.complete(message);
         } else {
             return asyncReceiver.abandon(message);
         }
     })
     .subscribe(unused -> {
     }, error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

 // When program ends, or you're done receiving all messages, dispose of the receiver.
 // Clients should be long-lived objects as they
 // require resources and time to establish a connection to the service.
 asyncReceiver.close();

Exemple : Recevoir des messages en RECEIVE_AND_DELETE mode à partir d’une entité Service Bus

L’exemple de code suivant illustre la création du client ServiceBusReceiverAsyncClient asynchrone à l’aide de RECEIVE_AND_DELETE. fullyQualifiedNamespace est le nom d’hôte de l’espace de noms Service Bus. Il est répertorié sous le volet « Essentials » après avoir accédé à l’espace de noms Event Hubs via le portail Azure. Les informations d’identification utilisées sont DefaultAzureCredential dues au fait qu’elles combinent les informations d’identification couramment utilisées dans le déploiement et le développement, et qu’elles choisissent les informations d’identification à utiliser en fonction de leur environnement en cours d’exécution. Consultez RECEIVE_AND_DELETE la documentation pour plus d’informations sur la réception de messages à l’aide de ce mode.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // Keep a reference to `subscription`. When the program is finished receiving messages, call
 // subscription.dispose(). This will stop fetching messages from the Service Bus.
 Disposable subscription = Flux.usingWhen(
         Mono.fromCallable(() -> {
             // Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
             // peek lock mode is used. In peek lock mode, users are responsible for settling messages.
             return new ServiceBusClientBuilder()
                 .credential(fullyQualifiedNamespace, credential)
                 .receiver()
                 .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
                 .queueName(queueName)
                 .buildAsyncClient();
         }), receiver -> {
             return receiver.receiveMessages();
         }, receiver -> {
             return Mono.fromRunnable(() -> receiver.close());
         })
     .subscribe(message -> {
             // Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
             // removed from the queue.
         System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
         System.out.printf("Contents of message as string: %s%n", message.getBody());
     },
         error -> System.out.println("Error occurred: " + error),
         () -> System.out.println("Receiving complete."));

Exemple : recevoir des messages d’une session spécifique

Pour extraire des messages d’une session spécifique, basculez vers ServiceBusSessionReceiverClientBuilder et générez le client récepteur de session. Utilisez acceptSession(String sessionId) pour créer un objet lié à la ServiceBusReceiverAsyncClientsession . L’exemple suppose que les sessions Service Bus ont été activées au moment de la création de la file d’attente.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
 // successfully locked.
 // `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
 // operations complete.
 // `Mono.usingWhen` can also be used if the resource closure returns a single item.
 Flux<Void> sessionMessages = Flux.usingWhen(
     sessionReceiver.acceptSession("<<my-session-id>>"),
     receiver -> {
         // Receive messages from <<my-session-id>> session.
         return receiver.receiveMessages().flatMap(message -> {
             System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
                 message.getBody());

             // Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
             if (isMessageProcessed) {
                 return receiver.complete(message);
             } else {
                 return receiver.abandon(message);
             }
         });
     },
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of resources.
         receiver.close();
         sessionReceiver.close();
     }));

 // When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
 // is non-blocking and kicks off the operation.
 Disposable subscription = sessionMessages.subscribe(
     unused -> {
     }, error -> System.err.print("Error receiving message from session: " + error),
     () -> System.out.println("Completed receiving from session."));

Exemple : Recevoir des messages de la première session disponible

Pour traiter les messages de la première session disponible, basculez vers ServiceBusSessionReceiverClientBuilder et générez le client récepteur de session. Utilisez acceptNextSession() pour rechercher la première session disponible à partir de laquelle traiter les messages.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 // 'disableAutoComplete' indicates that users will explicitly settle their message.
 ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, credential)
     .sessionReceiver()
     .disableAutoComplete()
     .queueName(sessionEnabledQueueName)
     .buildAsyncClient();

 // Creates a client to receive messages from the first available session. It waits until
 // AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
 // completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
 Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();

 Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
     receiver -> receiver.receiveMessages().flatMap(message -> {
         System.out.println("Received message: " + message.getBody());

         // Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
         if (isMessageProcessed) {
             return receiver.complete(message);
         } else {
             return receiver.abandon(message);
         }
     }),
     receiver -> Mono.fromRunnable(() -> {
         // Dispose of the receiver and sessionReceiver when done receiving messages.
         receiver.close();
         sessionReceiver.close();
     }));

 // This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
 // operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
 // receiving messages.
 Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
 }, error -> System.out.println("Error occurred: " + error),
     () -> System.out.println("Receiving complete."));

Exemple : Débit limitant la consommation des messages d’une entité Service Bus

Pour les récepteurs de messages qui doivent limiter le nombre de messages qu’ils reçoivent à un moment donné, ils peuvent utiliser BaseSubscriber#request(long).

// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
 asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
     private static final int NUMBER_OF_MESSAGES = 5;
     private final AtomicInteger currentNumberOfMessages = new AtomicInteger();

     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         // Tell the Publisher we only want 5 message at a time.
         request(NUMBER_OF_MESSAGES);
     }

     @Override
     protected void hookOnNext(ServiceBusReceivedMessage message) {
         // Process the ServiceBusReceivedMessage
         // If the number of messages we have currently received is a multiple of 5, that means we have reached
         // the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
         // that the subscriber is ready to get more messages from upstream.
         if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
             request(NUMBER_OF_MESSAGES);
         }
     }
 });

Résumé de la méthode

Modificateur et type Méthode et description
Mono<Void> abandon(ServiceBusReceivedMessage message)

Abandonne un ServiceBusReceivedMessage.

Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandonne une ServiceBusReceivedMessage mise à jour des propriétés du message.

void close()

Élimine le consommateur en fermant les liens sous-jacents au service.

Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext)

Valide la transaction et toutes les opérations qui lui sont associées.

Mono<Void> complete(ServiceBusReceivedMessage message)

Termine un .ServiceBusReceivedMessage

Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options)

Termine un ServiceBusReceivedMessage avec les options indiquées.

Mono<ServiceBusTransactionContext> createTransaction()

Démarre une nouvelle transaction côté service.

Mono<Void> deadLetter(ServiceBusReceivedMessage message)

Déplace un ServiceBusReceivedMessage vers la sous-file d’attente de lettres mortes.

Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Déplace un ServiceBusReceivedMessage vers la sous-file d’attente de lettres mortes avec les options indiquées.

Mono<Void> defer(ServiceBusReceivedMessage message)

Reporte un ServiceBusReceivedMessage.

Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)

Reporte un ServiceBusReceivedMessage avec les options définies.

String getEntityPath()

Obtient la ressource Service Bus avec laquelle ce client interagit.

String getFullyQualifiedNamespace()

Obtient l’espace de noms Service Bus complet auquel la connexion est associée.

String getIdentifier()

Obtient l’identificateur du instance de ServiceBusReceiverAsyncClient.

String getSessionId()

Obtient l’ID de sessionde la session si ce récepteur est un récepteur de session.

Mono<byte[]> getSessionState()

Obtient l’état de la session si ce récepteur est un récepteur de session.

Mono<ServiceBusReceivedMessage> peekMessage()

Lit le message actif suivant sans modifier l’état du récepteur ou de la source du message.

Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber)

À partir du numéro de séquence donné, lit ensuite le message actif sans modifier l’état du récepteur ou de la source du message.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages)

Lit le lot suivant de messages actifs sans modifier l’état du récepteur ou de la source du message.

Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber)

À partir du numéro de séquence donné, lit le lot suivant de messages actifs sans modifier l’état du récepteur ou de la source du message.

Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber)

Reçoit un différé ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers)

Reçoit un lot de ServiceBusReceivedMessage.

Flux<ServiceBusReceivedMessage> receiveMessages()

Reçoit un flux infini de ServiceBusReceivedMessage de l’entité Service Bus.

Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)

Renouvelle de manière asynchrone le verrou sur le message.

Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Démarre le renouvellement du verrouillage automatique pour un ServiceBusReceivedMessage.

Mono<OffsetDateTime> renewSessionLock()

Renouvelle le verrou de session si ce récepteur est un récepteur de session.

Mono<Void> renewSessionLock(Duration maxLockRenewalDuration)

Démarre le renouvellement du verrouillage automatique pour la session pour laquelle ce récepteur fonctionne.

Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext)

Restaure la transaction donnée et toutes les opérations qui lui sont associées.

Mono<Void> setSessionState(byte[] sessionState)

Définit l’état de la session pour laquelle ce récepteur fonctionne.

Méthodes héritées de java.lang.Object

Détails de la méthode

abandon

public Mono abandon(ServiceBusReceivedMessage message)

Abandonne un ServiceBusReceivedMessage. Cela rend le message à nouveau disponible pour traitement. L’abandon d’un message augmente le nombre de remises sur le message.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.

Returns:

Mono qui se termine lorsque l’opération d’abandon de Service Bus se termine.

abandon

public Mono abandon(ServiceBusReceivedMessage message, AbandonOptions options)

Abandonne une ServiceBusReceivedMessage mise à jour des propriétés du message. Cela rend le message à nouveau disponible pour traitement. L’abandon d’un message augmente le nombre de remises sur le message.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.
options - Options à définir lors de l’abandon du message.

Returns:

Mono qui se termine lorsque l’opération Service Bus se termine.

close

public void close()

Élimine le consommateur en fermant les liens sous-jacents au service.

commitTransaction

public Mono commitTransaction(ServiceBusTransactionContext transactionContext)

Valide la transaction et toutes les opérations qui lui sont associées.

Création et utilisation d’une transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Transaction à valider.

Returns:

Mono qui termine cette opération sur la ressource Service Bus.

complete

public Mono complete(ServiceBusReceivedMessage message)

Termine un .ServiceBusReceivedMessage Cela supprime le message du service.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.

Returns:

Mono qui se termine lorsque le message est terminé sur Service Bus.

complete

public Mono complete(ServiceBusReceivedMessage message, CompleteOptions options)

Termine un ServiceBusReceivedMessage avec les options indiquées. Cela supprime le message du service.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.
options - Options utilisées pour terminer le message.

Returns:

Mono qui se termine lorsque le message est terminé sur Service Bus.

createTransaction

public Mono createTransaction()

Démarre une nouvelle transaction côté service. Le ServiceBusTransactionContext doit être passé à toutes les opérations qui doivent se trouver dans cette transaction.

Création et utilisation d’une transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Returns:

Mono qui termine cette opération sur la ressource Service Bus.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message)

Déplace un ServiceBusReceivedMessage vers la sous-file d’attente de lettres mortes.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.

Returns:

Mono qui se termine lorsque l’opération de lettre morte se termine.

deadLetter

public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)

Déplace un ServiceBusReceivedMessage vers la sous-file d’attente de lettres mortes avec les options indiquées.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.
options - Options utilisées pour mettre en lettres mortes le message.

Returns:

Mono qui se termine lorsque l’opération de lettre morte se termine.

defer

public Mono defer(ServiceBusReceivedMessage message)

Reporte un ServiceBusReceivedMessage. Cela déplacera le message dans la sous-file d’attente différée.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.

Returns:

Mono qui se termine à la fin de l’opération de report Service Bus.

defer

public Mono defer(ServiceBusReceivedMessage message, DeferOptions options)

Reporte un ServiceBusReceivedMessage avec les options définies. Cela déplacera le message dans la sous-file d’attente différée.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.
options - Options utilisées pour différer le message.

Returns:

Mono qui se termine à la fin de l’opération de report.

getEntityPath

public String getEntityPath()

Obtient la ressource Service Bus avec laquelle ce client interagit.

Returns:

Ressource Service Bus avec laquelle ce client interagit.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Obtient l’espace de noms Service Bus complet auquel la connexion est associée. Cela est probablement similaire à {yournamespace}.servicebus.windows.net.

Returns:

Espace de noms Service Bus complet auquel la connexion est associée.

getIdentifier

public String getIdentifier()

Obtient l’identificateur du instance de ServiceBusReceiverAsyncClient.

Returns:

Identificateur qui peut identifier le instance de ServiceBusReceiverAsyncClient.

getSessionId

public String getSessionId()

Obtient le SessionId de la session si ce récepteur est un récepteur de session.

Returns:

SessionId ou null s’il ne s’agit pas d’un récepteur de session.

getSessionState

public Mono getSessionState()

Obtient l’état de la session si ce récepteur est un récepteur de session.

Returns:

État de session ou mono vide s’il n’y a pas d’état défini pour la session.

peekMessage

public Mono peekMessage()

Lit le message actif suivant sans modifier l’état du récepteur ou de la source du message. Le premier appel à peek() extraire le premier message actif pour ce récepteur. Chaque appel suivant extrait le message suivant dans l’entité.

Returns:

peekMessage

public Mono peekMessage(long sequenceNumber)

À partir du numéro de séquence donné, lit ensuite le message actif sans modifier l’état du récepteur ou de la source du message.

Parameters:

sequenceNumber - Numéro de séquence à partir duquel lire le message.

Returns:

peekMessages

public Flux peekMessages(int maxMessages)

Lit le lot suivant de messages actifs sans modifier l’état du récepteur ou de la source du message.

Parameters:

maxMessages - Nombre de messages.

Returns:

ServiceBusReceivedMessage de Flux qui sont peeked.

peekMessages

public Flux peekMessages(int maxMessages, long sequenceNumber)

À partir du numéro de séquence donné, lit le lot suivant de messages actifs sans modifier l’état du récepteur ou de la source du message.

Parameters:

maxMessages - Nombre de messages.
sequenceNumber - Numéro de séquence à partir duquel commencer la lecture des messages.

Returns:

A Flux de jeter un coup d’œil ServiceBusReceivedMessage .

receiveDeferredMessage

public Mono receiveDeferredMessage(long sequenceNumber)

Reçoit un différé ServiceBusReceivedMessage. Les messages différés ne peuvent être reçus qu’à l’aide du numéro de séquence.

Parameters:

sequenceNumber - getSequenceNumber() du message.

Returns:

Message différé avec la correspondance sequenceNumber.

receiveDeferredMessages

public Flux receiveDeferredMessages(Iterable sequenceNumbers)

Reçoit un lot de ServiceBusReceivedMessage. Les messages différés ne peuvent être reçus qu’à l’aide du numéro de séquence.

Parameters:

sequenceNumbers - Numéros de séquence des messages différés.

Returns:

A Flux de différé .ServiceBusReceivedMessage

receiveMessages

public Flux receiveMessages()

Reçoit un flux infini de ServiceBusReceivedMessage de l’entité Service Bus. Ce flux reçoit en continu les messages d’une entité Service Bus jusqu’à ce que :

  • Le récepteur est fermé.
  • L’abonnement au Flux est supprimé.
  • Un signal terminal provenant d’un abonné en aval est propagé amont (par exemple. Flux#take(long) ou Flux#take(Duration)).
  • Un AmqpException se produit qui entraîne l’arrêt du lien de réception.

Le client utilise un lien AMQP en dessous pour recevoir les messages ; le client effectue une transition transparente vers un nouveau lien AMQP si le client actuel rencontre une erreur pouvant être retentée. Lorsque le client rencontre une erreur non retenable ou épuise les nouvelles tentatives, le gestionnaire de terminal de l’Abonné org.reactivestreams.Subscriber#onError(Throwable) est averti de cette erreur. Aucun autre message ne sera remis après org.reactivestreams.Subscriber#onNext(Object) l’événement de terminal ; l’application doit créer un nouveau client pour reprendre la réception. Le réabonnement au Flux de l’ancien client n’aura aucun effet.

Remarque : voici quelques exemples d’erreurs non retriables : l’application qui tente de se connecter à une file d’attente qui n’existe pas, supprime ou désactive la file d’attente au milieu de la réception, l’utilisateur lançant explicitement Geo-DR. Il s’agit de certains événements où Service Bus indique au client qu’une erreur non retenable s’est produite.

Returns:

Flux infini de messages de l’entité Service Bus.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message)

Renouvelle de manière asynchrone le verrou sur le message. Le verrou est renouvelé en fonction du paramètre spécifié sur l’entité. Lorsqu’un message est reçu en PEEK_LOCK mode, le message est verrouillé sur le serveur pour ce récepteur instance pendant une durée spécifiée lors de la création de l’entité (LockDuration). Si le traitement du message nécessite une durée supérieure à cette durée, le verrou doit être renouvelé. Pour chaque renouvellement, le verrou est réinitialisé à la valeur LockDuration de l’entité.

Parameters:

message - pour effectuer le ServiceBusReceivedMessage renouvellement de verrouillage automatique.

Returns:

Nouvelle heure d’expiration du message.

renewMessageLock

public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)

Démarre le renouvellement du verrouillage automatique pour un ServiceBusReceivedMessage.

Parameters:

message - ServiceBusReceivedMessage pour effectuer cette opération.
maxLockRenewalDuration - Durée maximale pour continuer à renouveler le jeton de verrouillage.

Returns:

Mono qui se termine lorsque l’opération de renouvellement de message est terminée jusqu’à maxLockRenewalDuration.

renewSessionLock

public Mono renewSessionLock()

Renouvelle le verrou de session si ce récepteur est un récepteur de session.

Returns:

Heure d’expiration suivante pour le verrou de session.

renewSessionLock

public Mono renewSessionLock(Duration maxLockRenewalDuration)

Démarre le renouvellement du verrouillage automatique pour la session pour laquelle ce récepteur fonctionne.

Parameters:

maxLockRenewalDuration - Durée maximale pour continuer à renouveler le verrou de session.

Returns:

Opération de renouvellement de verrou pour le message.

rollbackTransaction

public Mono rollbackTransaction(ServiceBusTransactionContext transactionContext)

Restaure la transaction donnée et toutes les opérations qui lui sont associées.

Création et utilisation d’une transaction

// This mono creates a transaction and caches the output value, so we can associate operations with the
 // transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
 // the operation.
 Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
     .cache(value -> Duration.ofMillis(Long.MAX_VALUE),
         error -> Duration.ZERO,
         () -> Duration.ZERO);

 // Dispose of the disposable to cancel the operation.
 Disposable disposable = transactionContext.flatMap(transaction -> {
     // Process messages and associate operations with the transaction.
     Mono<Void> operations = Mono.when(
         asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
             asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
         asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));

     // Finally, either commit or rollback the transaction once all the operations are associated with it.
     return operations.then(asyncReceiver.commitTransaction(transaction));
 }).subscribe(unused -> {
 }, error -> {
     System.err.println("Error occurred processing transaction: " + error);
 }, () -> {
     System.out.println("Completed transaction");
 });

Parameters:

transactionContext - Transaction à restaurer.

Returns:

Mono qui termine cette opération sur la ressource Service Bus.

setSessionState

public Mono setSessionState(byte[] sessionState)

Définit l’état de la session pour laquelle ce récepteur fonctionne.

Parameters:

sessionState - État à définir sur la session.

Returns:

Mono qui se termine lorsque la session est définie

S’applique à