EventProcessorClient Class

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventProcessorClient

public class EventProcessorClient

EventProcessorClient provides a convenient mechanism to consume events from all partitions of an Event Hub in the context of a consumer group. Event Processor-based application consists of one or more instances of EventProcessorClient(s) which are set up to consume events from the same Event Hub, consumer group to balance the workload across different instances and track progress when events are processed. Based on the number of instances running, each EventProcessorClient may own zero or more partitions to balance the workload among all the instances.

Sample: Construct an EventProcessorClient

The sample below uses an in-memory CheckpointStore but azure-messaging-eventhubs-checkpointstore-blob provides a checkpoint store backed by Azure Blob Storage. Additionally, fullyQualifiedNamespace is the Event Hubs Namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. The consumerGroup is found by navigating to the Event Hub instance, and selecting "Consumer groups" under the "Entities" panel. The consumerGroup is required. The credential used is DefaultAzureCredential because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup("<< CONSUMER GROUP NAME >>")
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .checkpointStore(new SampleCheckpointStore())
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .buildEventProcessorClient();

Method Summary

Modifier and Type Method and Description
String getIdentifier()

The identifier is a unique name given to this event processor instance.

synchronized boolean isRunning()

Returns true if the event processor is running.

synchronized void start()

Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a dedicated PartitionProcessor to each partition.

synchronized void stop()

Stops processing events for all partitions owned by this event processor.

synchronized void stop(Duration timeout)

Stops processing events for all partitions owned by this event processor.

Methods inherited from java.lang.Object

Method Details

getIdentifier

public String getIdentifier()

The identifier is a unique name given to this event processor instance.

Returns:

Identifier for this event processor.

isRunning

public synchronized boolean isRunning()

Returns true if the event processor is running. If the event processor is already running, calling start() has no effect.

Returns:

true if the event processor is running.

start

public synchronized void start()

Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a dedicated PartitionProcessor to each partition. If there are other Event Processors active for the same consumer group on the Event Hub, responsibility for partitions will be shared between them.

Subsequent calls to start will be ignored if this event processor is already running. Calling start after stop() is called will restart this event processor.

Starting the processor to consume events from all partitions

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop()

Stops processing events for all partitions owned by this event processor. All PartitionProcessor will be shutdown and any open resources will be closed.

Subsequent calls to stop will be ignored if the event processor is not running or is being stopped.

This method will do the best effort to stop processing gracefully and will block for up to 10 seconds waiting for the processor to stop. Use stop(Duration timeout) overload to specify a different timeout.

Stopping the processor

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .processEvent(eventContext -> {
         System.out.printf("Partition id = %s and sequence number of event = %s%n",
             eventContext.getPartitionContext().getPartitionId(),
             eventContext.getEventData().getSequenceNumber());
     })
     .processError(errorContext -> {
         System.out.printf("Error occurred in partition processor for partition %s, %s%n",
             errorContext.getPartitionContext().getPartitionId(),
             errorContext.getThrowable());
     })
     .checkpointStore(new SampleCheckpointStore())
     .buildEventProcessorClient();

 eventProcessorClient.start();

 // Continue to perform other tasks while the processor is running in the background.
 //
 // Finally, stop the processor client when application is finished.
 eventProcessorClient.stop();

stop

public synchronized void stop(Duration timeout)

Stops processing events for all partitions owned by this event processor. All PartitionProcessor will be shutdown and any open resources will be closed.

Subsequent calls to stop will be ignored if the event processor is not running or is being stopped.

Parameters:

timeout - The maximum amount of time to wait for the processor to stop processing.

Applies to