EventProcessorClientBuilder Class
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClientBuilder
- com.
Implements
public class EventProcessorClientBuilder
implements TokenCredentialTrait<EventProcessorClientBuilder>, AzureNamedKeyCredentialTrait<EventProcessorClientBuilder>, ConnectionStringTrait<EventProcessorClientBuilder>, AzureSasCredentialTrait<EventProcessorClientBuilder>, AmqpTrait<EventProcessorClientBuilder>, ConfigurationTrait<EventProcessorClientBuilder>
This class provides a fluent builder API to help aid the configuration and instantiation of the EventProcessorClient. Calling buildEventProcessorClient() constructs a new instance of EventProcessorClient.
To create an instance of EventProcessorClient, the following fields are required:
CheckpointStore - An implementation of CheckpointStore that stores checkpoint and partition ownership information to enable load balancing and checkpointing processed events.
processEvent(Consumer<EventContext> processEvent) or processEventBatch(Consumer<EventBatchContext> processEventBatch, int maxBatchSize, Duration maxWaitTime) - A callback that processes events received from the Event Hub.
processError(Consumer<ErrorContext> processError) - A callback that handles errors that may occur while running the EventProcessorClient.
Credentials to perform operations against Azure Event Hubs. They can be set by using one of the following methods:
- connectionString(String connectionString) with a connection string to a specific Event Hub.
- connectionString(String connectionString, String eventHubName) with an Event Hub namespace connection string and the Event Hub name.
- credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential) with the fully qualified namespace, Event Hub name, and a set of credentials authorized to use the Event Hub.
- credential(TokenCredential credential), credential(AzureSasCredential credential), or credential(AzureNamedKeyCredential credential) along with fullyQualifiedNamespace(String fullyQualifiedNamespace) and eventHubName(String eventHubName). The fully qualified namespace, Event Hub name, and authorized credentials to use the Event Hub.
The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Sample: Construct an EventProcessorClient
The following code sample demonstrates the creation of the processor client. The processor client is recommended for production scenarios because it can load balance between multiple running instances, can perform checkpointing, and reconnects on transient failures such as network outages. The sample below uses an in-memory CheckpointStore but azure-messaging-eventhubs-checkpointstore-blob provides a checkpoint store backed by Azure Blob Storage.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Field Summary
Modifier and Type | Field and Description |
---|---|
static final Duration |
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Default load balancing update interval. |
static final Duration |
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Default ownership expiration. |
Constructor Summary
Constructor | Description |
---|---|
EventProcessorClientBuilder() |
Creates a new instance of EventProcessorClientBuilder. |
Method Summary
Methods inherited from java.lang.Object
Field Details
DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
public static final Duration DEFAULT_LOAD_BALANCING_UPDATE_INTERVAL
Default load balancing update interval. Balancing interval should account for latency between the client and the storage account.
DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
public static final Duration DEFAULT_OWNERSHIP_EXPIRATION_INTERVAL
Default ownership expiration.
Constructor Details
EventProcessorClientBuilder
public EventProcessorClientBuilder()
Creates a new instance of EventProcessorClientBuilder.
Method Details
buildEventProcessorClient
public EventProcessorClient buildEventProcessorClient()
This will create a new EventProcessorClient configured with the options set in this builder. Each call to this method will return a new instance of EventProcessorClient.
All partitions processed by this EventProcessorClient will start processing from earliest() available event in the respective partitions.
Returns:
checkpointStore
public EventProcessorClientBuilder checkpointStore(CheckpointStore checkpointStore)
Sets the CheckpointStore the EventProcessorClient will use for storing partition ownership and checkpoint information.
Users can, optionally, provide their own implementation of CheckpointStore which will store ownership and checkpoint information.
Parameters:
Returns:
clientOptions
public EventProcessorClientBuilder clientOptions(ClientOptions clientOptions)
Sets the client options for the processor client. The application id set on the client options will be used for tracing. The headers set on ClientOptions
are currently not used but can be used in later releases to add to AMQP message.
Parameters:
Returns:
configuration
public EventProcessorClientBuilder configuration(Configuration configuration)
Sets the configuration store that is used during construction of the service client. If not specified, the default configuration store is used to configure the EventHubAsyncClient. Use NONE to bypass using configuration settings during construction.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString)
Sets the credential information given a connection string to the Event Hub instance.
If the connection string is copied from the Event Hubs namespace, it will likely not contain the name to the desired Event Hub, which is needed. In this case, the name can be added manually by adding "EntityPath=EVENT_HUB_NAME" to the end of the connection string. For example, "EntityPath=telemetry-hub".
If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string from that Event Hub will result in a connection string that contains the name.
Parameters:
Returns:
connectionString
public EventProcessorClientBuilder connectionString(String connectionString, String eventHubName)
Sets the credential information given a connection string to the Event Hubs namespace and name to a specific Event Hub instance.
Parameters:
Returns:
consumerGroup
public EventProcessorClientBuilder consumerGroup(String consumerGroup)
Sets the consumer group name from which the EventProcessorClient should consume events.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureNamedKeyCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(AzureSasCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(TokenCredential credential)
Sets the TokenCredential used to authorize requests sent to the service. Refer to the Azure SDK for Java identity and authentication documentation for more details on proper usage of the TokenCredential type.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureNamedKeyCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, AzureSasCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
Parameters:
Returns:
credential
public EventProcessorClientBuilder credential(String fullyQualifiedNamespace, String eventHubName, TokenCredential credential)
Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
Parameters:
Returns:
customEndpointAddress
public EventProcessorClientBuilder customEndpointAddress(String customEndpointAddress)
Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through an intermediary. For example: https://my.custom.endpoint.com:55300.
If no port is specified, the default port for the transportType(AmqpTransportType transport) is used.
Parameters:
Returns:
eventHubName
public EventProcessorClientBuilder eventHubName(String eventHubName)
Sets the name of the Event Hub to connect the client to.
Parameters:
Returns:
fullyQualifiedNamespace
public EventProcessorClientBuilder fullyQualifiedNamespace(String fullyQualifiedNamespace)
Sets the fully qualified name for the Event Hubs namespace.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Map
Sets the map containing the event position to use for each partition if a checkpoint for the partition does not exist in CheckpointStore. This map is keyed off of the partition id.
Only one overload of initialPartitionEventPosition
should be used when constructing an EventProcessorClient.
Parameters:
Returns:
initialPartitionEventPosition
public EventProcessorClientBuilder initialPartitionEventPosition(Function
Sets the default starting position for each partition if a checkpoint for that partition does not exist in the CheckpointStore.
Only one overload of initialPartitionEventPosition
should be used when constructing an EventProcessorClient.
Parameters:
Returns:
loadBalancingStrategy
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy)
The LoadBalancingStrategy the EventProcessorClient will use for claiming partition ownership. By default, a BALANCED approach will be used.
Parameters:
Returns:
loadBalancingUpdateInterval
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration loadBalancingUpdateInterval)
The time interval between load balancing update cycles. This is also generally the interval at which ownership of partitions are renewed. By default, this interval is set to 10 seconds.
Parameters:
Returns:
partitionOwnershipExpirationInterval
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration partitionOwnershipExpirationInterval)
The time duration after which the ownership of partition expires if it's not renewed by the owning processor instance. This is the duration that this processor instance will wait before taking over the ownership of partitions previously owned by an inactive processor. By default, this duration is set to a minute.
Parameters:
Returns:
prefetchCount
public EventProcessorClientBuilder prefetchCount(int prefetchCount)
Sets the count used by the receivers to control the number of events each consumer will actively receive and queue locally without regard to whether a receive operation is currently active.
Parameters:
Returns:
processError
public EventProcessorClientBuilder processError(Consumer
The function that is called when an error occurs while processing events. The input contains the partition information where the error happened.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data.
Parameters:
Returns:
processEvent
public EventProcessorClientBuilder processEvent(Consumer
The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
Parameters:
Returns:
processEventBatch
public EventProcessorClientBuilder processEventBatch(Consumer
The function that is called for each event received by this EventProcessorClient. The input contains the partition context and the event data. If the max wait time is set, the receive will wait for that duration to receive an event and if is no event received, the consumer will be invoked with a null event data.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEventBatch(eventBatchContext -> {
eventBatchContext.getEvents().forEach(eventData -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventBatchContext.getPartitionContext().getPartitionId(),
eventData.getSequenceNumber());
});
}, 50, Duration.ofSeconds(30))
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Parameters:
Returns:
processPartitionClose
public EventProcessorClientBuilder processPartitionClose(Consumer
The function that is called when a processing for a partition stops. The input contains the partition information along with the reason for stopping the event processing for this partition.
Parameters:
Returns:
processPartitionInitialization
public EventProcessorClientBuilder processPartitionInitialization(Consumer
The function that is called before processing starts for a partition. The input contains the partition information along with a default starting position for processing events that will be used in the case of a checkpoint unavailable in CheckpointStore. Users can update this position if a different starting position is preferred.
Parameters:
Returns:
proxyOptions
public EventProcessorClientBuilder proxyOptions(ProxyOptions proxyOptions)
Sets the proxy configuration to use for EventHubAsyncClient. When a proxy is configured, AMQP_WEB_SOCKETS must be used for the transport type.
Parameters:
Returns:
retry
@Deprecated
public EventProcessorClientBuilder retry(AmqpRetryOptions retryOptions)
Deprecated
Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.
Parameters:
Returns:
retryOptions
public EventProcessorClientBuilder retryOptions(AmqpRetryOptions retryOptions)
Sets the retry policy for EventHubAsyncClient. If not specified, the default retry options are used.
Parameters:
Returns:
trackLastEnqueuedEventProperties
public EventProcessorClientBuilder trackLastEnqueuedEventProperties(boolean trackLastEnqueuedEventProperties)
Sets whether or not the event processor should request information on the last enqueued event on its associated partition, and track that information as events are received.
When information about the partition's last enqueued event is being tracked, each event received from the Event Hubs service will carry metadata about the partition that it otherwise would not. This results in a small amount of additional network bandwidth consumption that is generally a favorable trade-off when considered against periodically making requests for partition properties using the Event Hub client.
Parameters:
true
if the resulting events will keep track of the last
enqueued information for that partition; false
otherwise.
Returns:
transportType
public EventProcessorClientBuilder transportType(AmqpTransportType transport)
Sets the transport type by which all the communication with Azure Event Hubs occurs. Default value is AMQP.
Parameters:
Returns: