ServiceBusReceiverAsyncClient Klasse
- java.
lang. Object - com.
azure. messaging. servicebus. ServiceBusReceiverAsyncClient
- com.
Implementiert
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable
Ein asynchroner Empfänger, der für den Empfang ServiceBusReceivedMessage von einer Azure Service Bus Warteschlange oder eines Themas/Abonnements verantwortlich ist.
Die in diesem Dokument gezeigten Beispiele verwenden ein Anmeldeinformationsobjekt namens DefaultAzureCredential für die Authentifizierung, das für die meisten Szenarien geeignet ist, einschließlich lokaler Entwicklungs- und Produktionsumgebungen. Darüber hinaus wird empfohlen, die verwaltete Identität für die Authentifizierung in Produktionsumgebungen zu verwenden. Weitere Informationen zu verschiedenen Authentifizierungsmethoden und den entsprechenden Anmeldeinformationstypen finden Sie in der Azure Identity-Dokumentation.
Beispiel: Erstellen eines ServiceBusReceiverAsyncClient
Im folgenden Codebeispiel wird die Erstellung des asynchronen Clients ServiceBusReceiverAsyncClientveranschaulicht. Der fullyQualifiedNamespace
ist der Hostname des Service Bus-Namespaces. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigieren. Die verwendeten Anmeldeinformationen basieren DefaultAzureCredential
darauf, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. PEEK_LOCK (der Standard-Empfangsmodus) und disableAutoComplete() werden dringend empfohlen, damit Benutzer die Kontrolle über die Nachrichtenabwicklung haben.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusReceiverAsyncClient asyncReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.disableAutoComplete()
.queueName(queueName)
.buildAsyncClient();
// When users are done with the receiver, dispose of the receiver.
// Clients should be long-lived objects as they require resources
// and time to establish a connection to the service.
asyncReceiver.close();
Beispiel: Empfangen aller Nachrichten von der Service Bus-Ressource
Dadurch wird ein unendlicher Nachrichtenstrom von Service Bus zurückgegeben. Der Stream endet, wenn das Abonnement oder andere Terminalszenarien verworfen wird. Weitere Informationen finden Sie unter receiveMessages().
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
// Consider using Flux.usingWhen to scope the creation, usage, and cleanup of the receiver.
Disposable subscription = asyncReceiver.receiveMessages()
.flatMap(message -> {
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return asyncReceiver.complete(message);
} else {
return asyncReceiver.abandon(message);
}
})
.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
// When program ends, or you're done receiving all messages, dispose of the receiver.
// Clients should be long-lived objects as they
// require resources and time to establish a connection to the service.
asyncReceiver.close();
Beispiel: Empfangen von Nachrichten im RECEIVE_AND_DELETE Modus von einer Service Bus-Entität
Das folgende Codebeispiel veranschaulicht die Erstellung des asynchronen Clients ServiceBusReceiverAsyncClient mithilfe RECEIVE_AND_DELETEvon . Der fullyQualifiedNamespace
ist der Hostname des Service Bus-Namespaces. Sie wird im Bereich "Essentials" aufgeführt, nachdem Sie über das Azure-Portal zum Event Hubs-Namespace navigieren. Die verwendeten Anmeldeinformationen basieren DefaultAzureCredential
darauf, dass sie häufig verwendete Anmeldeinformationen in Bereitstellung und Entwicklung kombiniert und die zu verwendenden Anmeldeinformationen basierend auf der ausgeführten Umgebung auswählen. Weitere Informationen zum Empfangen von Nachrichten in diesem Modus finden Sie RECEIVE_AND_DELETE in der Dokumentation.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// Keep a reference to `subscription`. When the program is finished receiving messages, call
// subscription.dispose(). This will stop fetching messages from the Service Bus.
Disposable subscription = Flux.usingWhen(
Mono.fromCallable(() -> {
// Setting the receiveMode when creating the receiver enables receive and delete mode. By default,
// peek lock mode is used. In peek lock mode, users are responsible for settling messages.
return new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.receiver()
.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
.queueName(queueName)
.buildAsyncClient();
}), receiver -> {
return receiver.receiveMessages();
}, receiver -> {
return Mono.fromRunnable(() -> receiver.close());
})
.subscribe(message -> {
// Messages received in RECEIVE_AND_DELETE mode do not have to be settled because they are automatically
// removed from the queue.
System.out.printf("Received Seq #: %s%n", message.getSequenceNumber());
System.out.printf("Contents of message as string: %s%n", message.getBody());
},
error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Beispiel: Empfangen von Nachrichten aus einer bestimmten Sitzung
Um Nachrichten aus einer bestimmten Sitzung abzurufen, wechseln Sie zum ServiceBusSessionReceiverClientBuilder Sitzungsempfängerclient, und erstellen Sie diesen. Verwenden Sie acceptSession(String sessionId) zum Erstellen einer sitzungsgebundenen ServiceBusReceiverAsyncClient. Im Beispiel wird davon ausgegangen, dass Service Bus-Sitzungen zum Zeitpunkt der Warteschlangenerstellung aktiviert wurden.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// acceptSession(String) completes successfully with a receiver when "<<my-session-id>>" session is
// successfully locked.
// `Flux.usingWhen` is used, so we dispose of the receiver resource after `receiveMessages()` and the settlement
// operations complete.
// `Mono.usingWhen` can also be used if the resource closure returns a single item.
Flux<Void> sessionMessages = Flux.usingWhen(
sessionReceiver.acceptSession("<<my-session-id>>"),
receiver -> {
// Receive messages from <<my-session-id>> session.
return receiver.receiveMessages().flatMap(message -> {
System.out.printf("Received Sequence #: %s. Contents: %s%n", message.getSequenceNumber(),
message.getBody());
// Explicitly settle the message using complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
});
},
receiver -> Mono.fromRunnable(() -> {
// Dispose of resources.
receiver.close();
sessionReceiver.close();
}));
// When program ends, or you're done receiving all messages, the `subscription` can be disposed of. This code
// is non-blocking and kicks off the operation.
Disposable subscription = sessionMessages.subscribe(
unused -> {
}, error -> System.err.print("Error receiving message from session: " + error),
() -> System.out.println("Completed receiving from session."));
Beispiel: Empfangen von Nachrichten aus der ersten verfügbaren Sitzung
Um Nachrichten aus der ersten verfügbaren Sitzung zu verarbeiten, wechseln Sie zum ServiceBusSessionReceiverClientBuilder Sitzungsempfängerclient, und erstellen Sie diesen. Verwenden Sie acceptNextSession() , um die erste verfügbare Sitzung zum Verarbeiten von Nachrichten zu finden.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
// 'disableAutoComplete' indicates that users will explicitly settle their message.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
.credential(fullyQualifiedNamespace, credential)
.sessionReceiver()
.disableAutoComplete()
.queueName(sessionEnabledQueueName)
.buildAsyncClient();
// Creates a client to receive messages from the first available session. It waits until
// AmqpRetryOptions.getTryTimeout() elapses. If no session is available within that operation timeout, it
// completes with a retriable error. Otherwise, a receiver is returned when a lock on the session is acquired.
Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptNextSession();
Flux<Void> receiveMessagesFlux = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages().flatMap(message -> {
System.out.println("Received message: " + message.getBody());
// Explicitly settle the message via complete, abandon, defer, dead-letter, etc.
if (isMessageProcessed) {
return receiver.complete(message);
} else {
return receiver.abandon(message);
}
}),
receiver -> Mono.fromRunnable(() -> {
// Dispose of the receiver and sessionReceiver when done receiving messages.
receiver.close();
sessionReceiver.close();
}));
// This is a non-blocking call that moves onto the next line of code after setting up and starting the receive
// operation. Customers can keep a reference to `subscription` and dispose of it when they want to stop
// receiving messages.
Disposable subscription = receiveMessagesFlux.subscribe(unused -> {
}, error -> System.out.println("Error occurred: " + error),
() -> System.out.println("Receiving complete."));
Beispiel: Ratenbegrenzung des Verbrauchs von Nachrichten aus einer Service Bus-Entität
Für Nachrichtenempfänger, die die Anzahl der nachrichten einschränken müssen, die sie zu einem bestimmten Zeitpunkt empfangen, können sie verwenden BaseSubscriber#request(long).
// This is a non-blocking call. The program will move to the next line of code after setting up the operation.
asyncReceiver.receiveMessages().subscribe(new BaseSubscriber<ServiceBusReceivedMessage>() {
private static final int NUMBER_OF_MESSAGES = 5;
private final AtomicInteger currentNumberOfMessages = new AtomicInteger();
@Override
protected void hookOnSubscribe(Subscription subscription) {
// Tell the Publisher we only want 5 message at a time.
request(NUMBER_OF_MESSAGES);
}
@Override
protected void hookOnNext(ServiceBusReceivedMessage message) {
// Process the ServiceBusReceivedMessage
// If the number of messages we have currently received is a multiple of 5, that means we have reached
// the last message the Subscriber will provide to us. Invoking request(long) here, tells the Publisher
// that the subscriber is ready to get more messages from upstream.
if (currentNumberOfMessages.incrementAndGet() % 5 == 0) {
request(NUMBER_OF_MESSAGES);
}
}
});
Methodenzusammenfassung
Modifizierer und Typ | Methode und Beschreibung |
---|---|
Mono<Void> |
abandon(ServiceBusReceivedMessage message)
Gibt ein ServiceBusReceivedMessageauf. |
Mono<Void> |
abandon(ServiceBusReceivedMessage message, AbandonOptions options)
Gibt auf, dass die ServiceBusReceivedMessage Eigenschaften der Nachricht aktualisiert werden. |
void |
close()
Entsorgt den Consumer, indem die zugrunde liegenden Links zum Dienst geschlossen werden. |
Mono<Void> |
commitTransaction(ServiceBusTransactionContext transactionContext)
Committent die Transaktion und alle ihr zugeordneten Vorgänge. |
Mono<Void> |
complete(ServiceBusReceivedMessage message)
Schließt eine ab ServiceBusReceivedMessage. |
Mono<Void> |
complete(ServiceBusReceivedMessage message, CompleteOptions options)
Schließt eine ServiceBusReceivedMessage mit den angegebenen Optionen ab. |
Mono<Service |
createTransaction()
Startet eine neue dienstseitige Transaktion. |
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message)
Verschiebt eine ServiceBusReceivedMessage in die Unterwarteschlange für unzustellbare Nachrichten. |
Mono<Void> |
deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options)
Verschiebt eine ServiceBusReceivedMessage mit den angegebenen Optionen in die Unterwarteschlange für unzustellbare Nachrichten. |
Mono<Void> |
defer(ServiceBusReceivedMessage message)
Verschiebt einen ServiceBusReceivedMessagezurück. |
Mono<Void> |
defer(ServiceBusReceivedMessage message, DeferOptions options)
Verschiebt eine ServiceBusReceivedMessage mit den festgelegten Optionen. |
String |
getEntityPath()
Ruft die Service Bus-Ressource ab, mit der dieser Client interagiert. |
String |
getFullyQualifiedNamespace()
Ruft den vollqualifizierten Service Bus-Namespace ab, dem die Verbindung zugeordnet ist. |
String |
getIdentifier()
Ruft den Bezeichner des instance von abServiceBusReceiverAsyncClient. |
String |
getSessionId()
Ruft die Sitzungs-ID |
Mono<byte[]> |
getSessionState()
Ruft den Zustand der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist. |
Mono<Service |
peekMessage()
Liest die nächste aktive Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. |
Mono<Service |
peekMessage(long sequenceNumber)
Ab der angegebenen Sequenznummer liest neben der aktiven Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. |
Flux<Service |
peekMessages(int maxMessages)
Liest den nächsten Batch mit aktiven Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. |
Flux<Service |
peekMessages(int maxMessages, long sequenceNumber)
Liest ab der angegebenen Sequenznummer den nächsten Batch aktiver Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. |
Mono<Service |
receiveDeferredMessage(long sequenceNumber)
Empfängt eine verzögerte ServiceBusReceivedMessage. |
Flux<Service |
receiveDeferredMessages(Iterable<Long> sequenceNumbers)
Empfängt einen Batch mit verzögertem ServiceBusReceivedMessage. |
Flux<Service |
receiveMessages()
Empfängt einen unendlichen Stream von ServiceBusReceivedMessage von der Service Bus-Entität. |
Mono<Offset |
renewMessageLock(ServiceBusReceivedMessage message)
Erneuert asynchron die Sperre für die Nachricht. |
Mono<Void> |
renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration)
Startet die verlängerung der automatischen Sperre für ein ServiceBusReceivedMessage. |
Mono<Offset |
renewSessionLock()
Verlängert die Sitzungssperre, wenn dieser Empfänger ein Sitzungsempfänger ist. |
Mono<Void> |
renewSessionLock(Duration maxLockRenewalDuration)
Startet die Verlängerung der automatischen Sperre für die Sitzung, für die dieser Empfänger arbeitet. |
Mono<Void> |
rollbackTransaction(ServiceBusTransactionContext transactionContext)
Rollbacks für die angegebene Transaktion und alle damit verbundenen Vorgänge. |
Mono<Void> |
setSessionState(byte[] sessionState)
Legt den Zustand der Sitzung fest, für die dieser Empfänger arbeitet. |
Geerbte Methoden von java.lang.Object
Details zur Methode
abandon
public Mono
Gibt ein ServiceBusReceivedMessageauf. Dadurch wird die Nachricht wieder für die Verarbeitung verfügbar. Wenn Sie eine Nachricht abbrechen, wird die Anzahl der Zustellungen für die Nachricht erhöht.
Parameters:
Returns:
abandon
public Mono
Beendet eine ServiceBusReceivedMessage Aktualisierung der Nachrichteneigenschaften. Dadurch wird die Nachricht wieder für die Verarbeitung verfügbar. Wenn Sie eine Nachricht abbrechen, wird die Anzahl der Zustellungen für die Nachricht erhöht.
Parameters:
Returns:
close
public void close()
Entsorgt den Consumer, indem die zugrunde liegenden Links zum Dienst geschlossen werden.
commitTransaction
public Mono
Committent die Transaktion und alle ihr zugeordneten Vorgänge.
Erstellen und Verwenden einer Transaktion
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
complete
public Mono
Schließt eine ab ServiceBusReceivedMessage. Dadurch wird die Nachricht aus dem Dienst gelöscht.
Parameters:
Returns:
complete
public Mono
Schließt eine ServiceBusReceivedMessage mit den angegebenen Optionen ab. Dadurch wird die Nachricht aus dem Dienst gelöscht.
Parameters:
Returns:
createTransaction
public Mono
Startet eine neue dienstseitige Transaktion. Sollte ServiceBusTransactionContext an alle Vorgänge übergeben werden, die in dieser Transaktion sein müssen.
Erstellen und Verwenden einer Transaktion
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Returns:
deadLetter
public Mono
Verschiebt eine ServiceBusReceivedMessage in die Unterwarteschlange für unzustellbare Nachrichten.
Parameters:
Returns:
deadLetter
public Mono
Verschiebt eine ServiceBusReceivedMessage mit den angegebenen Optionen in die Unterwarteschlange für unzustellbare Nachrichten.
Parameters:
Returns:
defer
public Mono
Verschiebt einen ServiceBusReceivedMessagezurück. Dadurch wird die Nachricht in die verzögerte Untergeordnete Warteschlange verschoben.
Parameters:
Returns:
defer
public Mono
Verschiebt eine ServiceBusReceivedMessage mit den festgelegten Optionen. Dadurch wird die Nachricht in die verzögerte Untergeordnete Warteschlange verschoben.
Parameters:
Returns:
getEntityPath
public String getEntityPath()
Ruft die Service Bus-Ressource ab, mit der dieser Client interagiert.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Ruft den vollqualifizierten Service Bus-Namespace ab, dem die Verbindung zugeordnet ist. Dies ist wahrscheinlich ähnlich wie .{yournamespace}.servicebus.windows.net
Returns:
getIdentifier
public String getIdentifier()
Ruft den Bezeichner des instance von abServiceBusReceiverAsyncClient.
Returns:
getSessionId
public String getSessionId()
Ruft die SessionId der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.
Returns:
getSessionState
public Mono
Ruft den Zustand der Sitzung ab, wenn dieser Empfänger ein Sitzungsempfänger ist.
Returns:
peekMessage
public Mono
Liest die nächste aktive Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern. Der erste Aufruf von ruft peek()
die erste aktive Nachricht für diesen Empfänger ab. Jeder nachfolgende Aufruf ruft die nachfolgende Nachricht in der Entität ab.
Returns:
peekMessage
public Mono
Ab der angegebenen Sequenznummer liest neben der aktiven Nachricht, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.
Parameters:
Returns:
peekMessages
public Flux
Liest den nächsten Batch mit aktiven Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.
Parameters:
Returns:
peekMessages
public Flux
Liest ab der angegebenen Sequenznummer den nächsten Batch aktiver Nachrichten, ohne den Status des Empfängers oder der Nachrichtenquelle zu ändern.
Parameters:
Returns:
receiveDeferredMessage
public Mono
Empfängt eine verzögerte ServiceBusReceivedMessage. Verzögerte Nachrichten können nur mithilfe der Sequenznummer empfangen werden.
Parameters:
Returns:
sequenceNumber
.receiveDeferredMessages
public Flux
Empfängt einen Batch mit verzögertem ServiceBusReceivedMessage. Verzögerte Nachrichten können nur mithilfe der Sequenznummer empfangen werden.
Parameters:
Returns:
receiveMessages
public Flux
Empfängt einen unendlichen Stream von ServiceBusReceivedMessage von der Service Bus-Entität. Dieser Flux empfängt kontinuierlich Nachrichten von einer Service Bus-Entität, bis:
- Der Empfänger ist geschlossen.
- Das Abonnement für flux wird verworfen.
- Ein Terminalsignal von einem Downstreamabonnent wird Upstream (dh. Flux#take(long) oder Flux#take(Duration)).
- Ein AmqpException tritt auf, der dazu führt, dass der Empfangslink beendet wird.
Der Client verwendet einen AMQP-Link darunter, um die Nachrichten zu empfangen. Der Client wechselt transparent zu einem neuen AMQP-Link, wenn für den aktuellen ein wiederholbarer Fehler auftritt. Wenn auf dem Client ein nicht wiederholbarer Fehler auftritt oder die Wiederholungsversuche erschöpft sind, wird der Terminalhandler des org.reactivestreams.Subscriber#onError(Throwable) Abonnenten mit diesem Fehler benachrichtigt. Nach dem Terminalereignis werden keine weiteren Nachrichten übermittelt org.reactivestreams.Subscriber#onNext(Object) . Die Anwendung muss einen neuen Client erstellen, um den Empfang fortzusetzen. Das erneute Abonnieren des Flux des alten Clients hat keine Auswirkungen.
Hinweis: Einige Beispiele für nicht wiederholbare Fehler sind: Die Anwendung versucht, eine Verbindung mit einer Warteschlange herzustellen, die nicht vorhanden ist, das Löschen oder Deaktivieren der Warteschlange in der Mitte des Empfangs, der Benutzer initiiert explizit geo-DR. Dies sind bestimmte Ereignisse, bei denen Service Bus dem Client mitgeteilt, dass ein nicht wiederholbarer Fehler aufgetreten ist.
Returns:
renewMessageLock
public Mono
Erneuert asynchron die Sperre für die Nachricht. Die Sperre wird basierend auf der für die Entität angegebenen Einstellung verlängert. Wenn eine Nachricht im PEEK_LOCK Modus empfangen wird, wird die Nachricht auf dem Server für diesen Empfänger instance für einen Zeitraum gesperrt, der während der Entitätserstellung angegeben wurde (LockDuration). Wenn die Verarbeitung der Nachricht länger als diese Dauer erfordert, muss die Sperre verlängert werden. Bei jeder Verlängerung wird die Sperre auf den LockDuration-Wert der Entität zurückgesetzt.
Parameters:
Returns:
renewMessageLock
public Mono
Startet die verlängerung der automatischen Sperre für ein ServiceBusReceivedMessage.
Parameters:
Returns:
maxLockRenewalDuration
abgeschlossen ist.renewSessionLock
public Mono
Verlängert die Sitzungssperre, wenn dieser Empfänger ein Sitzungsempfänger ist.
Returns:
renewSessionLock
public Mono
Startet die Verlängerung der automatischen Sperre für die Sitzung, für die dieser Empfänger arbeitet.
Parameters:
Returns:
rollbackTransaction
public Mono
Rollbacks für die angegebene Transaktion und alle damit verbundenen Vorgänge.
Erstellen und Verwenden einer Transaktion
// This mono creates a transaction and caches the output value, so we can associate operations with the
// transaction. It does not cache the value if it is an error or completes with no items, effectively retrying
// the operation.
Mono<ServiceBusTransactionContext> transactionContext = asyncReceiver.createTransaction()
.cache(value -> Duration.ofMillis(Long.MAX_VALUE),
error -> Duration.ZERO,
() -> Duration.ZERO);
// Dispose of the disposable to cancel the operation.
Disposable disposable = transactionContext.flatMap(transaction -> {
// Process messages and associate operations with the transaction.
Mono<Void> operations = Mono.when(
asyncReceiver.receiveDeferredMessage(sequenceNumber).flatMap(message ->
asyncReceiver.complete(message, new CompleteOptions().setTransactionContext(transaction))),
asyncReceiver.abandon(receivedMessage, new AbandonOptions().setTransactionContext(transaction)));
// Finally, either commit or rollback the transaction once all the operations are associated with it.
return operations.then(asyncReceiver.commitTransaction(transaction));
}).subscribe(unused -> {
}, error -> {
System.err.println("Error occurred processing transaction: " + error);
}, () -> {
System.out.println("Completed transaction");
});
Parameters:
Returns:
setSessionState
public Mono
Legt den Zustand der Sitzung fest, für die dieser Empfänger arbeitet.
Parameters:
Returns:
Gilt für:
Azure SDK for Java