ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder Class

  • java.lang.Object
    • com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder

public final class ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder

Builder for creating ServiceBusProcessorClient to consume messages from a session-based Service Bus entity. ServiceBusProcessorClient processes messages and errors via processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) and processError(Consumer<ServiceBusErrorContext> processError). When the processor finishes processing a session, it tries to fetch the next session to process.

By default, the processor:

Instantiate a session-enabled processor client

// Function that gets called whenever a message is received.
 Consumer<ServiceBusReceivedMessageContext> onMessage = context -> {
     ServiceBusReceivedMessage message = context.getMessage();
     System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n",
         message.getSessionId(), message.getSequenceNumber(), message.getBody());
 };

 Consumer<ServiceBusErrorContext> onError = context -> {
     System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n",
         context.getFullyQualifiedNamespace(), context.getEntityPath());

     if (context.getException() instanceof ServiceBusException) {
         ServiceBusException exception = (ServiceBusException) context.getException();

         System.out.printf("Error source: %s, reason %s%n", context.getErrorSource(),
             exception.getReason());
     } else {
         System.out.printf("Error occurred: %s%n", context.getException());
     }
 };

 TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();

 // Create the processor client via the builder and its sub-builder
 // 'fullyQualifiedNamespace' will look similar to "{your-namespace}.servicebus.windows.net"
 ServiceBusProcessorClient sessionProcessor = new ServiceBusClientBuilder()
     .credential(fullyQualifiedNamespace, tokenCredential)
     .sessionProcessor()
     .queueName(sessionEnabledQueueName)
     .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
     .disableAutoComplete()
     .maxConcurrentSessions(2)
     .processMessage(onMessage)
     .processError(onError)
     .buildProcessorClient();

 // Starts the processor in the background. Control returns immediately.
 sessionProcessor.start();

 // Stop processor and dispose when done processing messages.
 sessionProcessor.stop();
 sessionProcessor.close();

Method Summary

Modifier and Type Method and Description
ServiceBusProcessorClient buildProcessorClient()

Creates a session-aware Service Bus processor responsible for reading ServiceBusReceivedMessage from a specific queue or subscription.

ServiceBusSessionProcessorClientBuilder disableAutoComplete()

Disables auto-complete and auto-abandon of received messages.

ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

Sets the amount of time to continue auto-renewing the lock.

ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

Max concurrent messages that this processor should process.

ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)

Enables session processing roll-over by processing at most maxConcurrentSessions.

ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)

Sets the prefetch count of the processor.

ServiceBusSessionProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError)

The error handler for the processor which will be invoked in the event of an error while receiving messages.

ServiceBusSessionProcessorClientBuilder processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage)

The message processing callback for the processor that will be executed when a message is received.

ServiceBusSessionProcessorClientBuilder queueName(String queueName)

Sets the name of the queue to create a processor for.

ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

Sets the receive mode for the processor.

ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout)

Sets the maximum amount of time to wait for a message to be received for the currently active session.

ServiceBusSessionProcessorClientBuilder subQueue(SubQueue subQueue)

Sets the type of the SubQueue to connect to.

ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)

Sets the name of the subscription in the topic to listen to.

ServiceBusSessionProcessorClientBuilder topicName(String topicName)

Sets the name of the topic.

Methods inherited from java.lang.Object

Method Details

buildProcessorClient

public ServiceBusProcessorClient buildProcessorClient()

Creates a session-aware Service Bus processor responsible for reading ServiceBusReceivedMessage from a specific queue or subscription.

Returns:

An new ServiceBusProcessorClient that receives messages from a queue or subscription.

disableAutoComplete

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder disableAutoComplete()

Disables auto-complete and auto-abandon of received messages. By default, a successfully processed message is complete(). If an error happens when the message is processed, it is abandon().

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

maxAutoLockRenewDuration

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxAutoLockRenewDuration(Duration maxAutoLockRenewDuration)

Sets the amount of time to continue auto-renewing the lock. Setting Duration#ZERO or null disables auto-renewal. For RECEIVE_AND_DELETE mode, auto-renewal is disabled.

A Service Bus queue or subscription in a topic will have a lock duration set at the resource level. When the processor client connect to a session in the resource, the broker will apply an initial lock to the session. This initial lock lasts for the lock duration set at the resource level. If the client does not renew the initial lock before it expires then the session will be released and become available for other receivers. Each time the client renews the lock, the broker will extend the lock for the lock duration set at the resource level. To keep the session locked, the client will have to continuously renew the session lock before its expiration. maxAutoLockRenewDuration controls how long the background renewal task runs. So, it is possible that the previous renewed lock can be valid after the renewal task is disposed.

By default, the session lock renewal task will run for 5 minutes.

Parameters:

maxAutoLockRenewDuration - the amount of time to continue auto-renewing the lock. Duration#ZERO or null indicates that auto-renewal is disabled.

Returns:

The updated ServiceBusSessionProcessorClientBuilder object.

maxConcurrentCalls

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentCalls(int maxConcurrentCalls)

Max concurrent messages that this processor should process.

This setting allows the application to configure the number of concurrent calls to the message processing callback ServiceBusSessionProcessorClientBuilder#processMessage(Consumer) per session, allowing parallel processing of multiple messages across sessions.

Parameters:

maxConcurrentCalls - max concurrent messages that this processor should process.

Returns:

The updated ServiceBusSessionProcessorClientBuilder object.

maxConcurrentSessions

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConcurrentSessions)

Enables session processing roll-over by processing at most maxConcurrentSessions.

Parameters:

maxConcurrentSessions - Maximum number of concurrent sessions to process at any given time.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

prefetchCount

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder prefetchCount(int prefetchCount)

Sets the prefetch count of the processor. For both PEEK_LOCK and RECEIVE_AND_DELETE modes the default value is 0. Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when and before the application starts the processor. Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off. Using a non-zero prefetch risks of losing messages even though it has better performance.

Parameters:

prefetchCount - The prefetch count.

Returns:

The modified ServiceBusProcessorClientBuilder object.

processError

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processError(Consumer processError)

The error handler for the processor which will be invoked in the event of an error while receiving messages.

Parameters:

processError - The error handler which will be executed when an error occurs.

Returns:

The updated ServiceBusProcessorClientBuilder object

processMessage

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder processMessage(Consumer processMessage)

The message processing callback for the processor that will be executed when a message is received.

Parameters:

processMessage - The message processing consumer that will be executed when a message is received.

Returns:

The updated ServiceBusProcessorClientBuilder object.

queueName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder queueName(String queueName)

Sets the name of the queue to create a processor for.

Parameters:

queueName - Name of the queue.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

receiveMode

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode)

Sets the receive mode for the processor.

Parameters:

receiveMode - Mode for receiving messages.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

sessionIdleTimeout

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder sessionIdleTimeout(Duration sessionIdleTimeout)

Sets the maximum amount of time to wait for a message to be received for the currently active session. After this time has elapsed, the processor will close the session and attempt to process another session.

After the processor delivers a message to the processMessage(Consumer<ServiceBusReceivedMessageContext> processMessage) handler, if the processor is unable to receive the next message from the session because there is no next message in the session or processing the current message takes longer than the sessionIdleTimeout then the session will time out. To avoid inadvertently losing sessions, choose a sessionIdleTimeout greater than the processing time of a message.

If not specified, the AmqpRetryOptions#getTryTimeout() will be used.

Parameters:

sessionIdleTimeout - Session idle timeout.

Returns:

The updated ServiceBusSessionProcessorClientBuilder object.

subQueue

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subQueue(SubQueue subQueue)

Sets the type of the SubQueue to connect to. Azure Service Bus queues and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ).

Parameters:

subQueue - The type of the sub queue.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

subscriptionName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder subscriptionName(String subscriptionName)

Sets the name of the subscription in the topic to listen to. topicName(String topicName) must also be set.

Parameters:

subscriptionName - Name of the subscription.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

topicName

public ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder topicName(String topicName)

Sets the name of the topic. subscriptionName(String subscriptionName) must also be set.

Parameters:

topicName - Name of the topic.

Returns:

The modified ServiceBusSessionProcessorClientBuilder object.

Applies to