EventProcessorClient Class
- java.
lang. Object - com.
azure. messaging. eventhubs. EventProcessorClient
- com.
public class EventProcessorClient
EventProcessorClient provides a convenient mechanism to consume events from all partitions of an Event Hub in the context of a consumer group. Event Processor-based application consists of one or more instances of EventProcessorClient(s) which are set up to consume events from the same Event Hub, consumer group to balance the workload across different instances and track progress when events are processed. Based on the number of instances running, each EventProcessorClient may own zero or more partitions to balance the workload among all the instances.
Sample: Construct an EventProcessorClient
The sample below uses an in-memory CheckpointStore but azure-messaging-eventhubs-checkpointstore-blob provides a checkpoint store backed by Azure Blob Storage. Additionally, fullyQualifiedNamespace
is the Event Hubs Namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The credential used is DefaultAzureCredential
because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment. The consumerGroup
is found by navigating to the Event Hub instance, and selecting "Consumer groups" under the "Entities" panel. The consumerGroup
is required. The credential used is DefaultAzureCredential
because it combines commonly used credentials in deployment and development and chooses the credential to used based on its running environment.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.checkpointStore(new SampleCheckpointStore())
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.buildEventProcessorClient();
Method Summary
Modifier and Type | Method and Description |
---|---|
String |
getIdentifier()
The identifier is a unique name given to this event processor instance. |
synchronized boolean |
isRunning()
Returns |
synchronized void |
start()
Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a dedicated PartitionProcessor to each partition. |
synchronized void |
stop()
Stops processing events for all partitions owned by this event processor. |
synchronized void |
stop(Duration timeout)
Stops processing events for all partitions owned by this event processor. |
Methods inherited from java.lang.Object
Method Details
getIdentifier
public String getIdentifier()
The identifier is a unique name given to this event processor instance.
Returns:
isRunning
public synchronized boolean isRunning()
Returns true
if the event processor is running. If the event processor is already running, calling start() has no effect.
Returns:
true
if the event processor is running.start
public synchronized void start()
Starts processing of events for all partitions of the Event Hub that this event processor can own, assigning a dedicated PartitionProcessor to each partition. If there are other Event Processors active for the same consumer group on the Event Hub, responsibility for partitions will be shared between them.
Subsequent calls to start will be ignored if this event processor is already running. Calling start after stop() is called will restart this event processor.
Starting the processor to consume events from all partitions
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
stop
public synchronized void stop()
Stops processing events for all partitions owned by this event processor. All PartitionProcessor will be shutdown and any open resources will be closed.
Subsequent calls to stop will be ignored if the event processor is not running or is being stopped.
This method will do the best effort to stop processing gracefully and will block for up to 10 seconds waiting for the processor to stop. Use stop(Duration timeout) overload to specify a different timeout.
Stopping the processor
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.processEvent(eventContext -> {
System.out.printf("Partition id = %s and sequence number of event = %s%n",
eventContext.getPartitionContext().getPartitionId(),
eventContext.getEventData().getSequenceNumber());
})
.processError(errorContext -> {
System.out.printf("Error occurred in partition processor for partition %s, %s%n",
errorContext.getPartitionContext().getPartitionId(),
errorContext.getThrowable());
})
.checkpointStore(new SampleCheckpointStore())
.buildEventProcessorClient();
eventProcessorClient.start();
// Continue to perform other tasks while the processor is running in the background.
//
// Finally, stop the processor client when application is finished.
eventProcessorClient.stop();
stop
public synchronized void stop(Duration timeout)
Stops processing events for all partitions owned by this event processor. All PartitionProcessor will be shutdown and any open resources will be closed.
Subsequent calls to stop will be ignored if the event processor is not running or is being stopped.
Parameters:
Applies to
Azure SDK for Java