EventHubConsumerClient Class
- java.
lang. Object - com.
azure. messaging. eventhubs. EventHubConsumerClient
- com.
Implements
public class EventHubConsumerClient
implements Closeable
A synchronous consumer responsible for reading EventData from an Event Hub partition in the context of a specific consumer group.
Most receive operations contain a parameter maxWaitTime
. The iterable is returned when either maxWaitTime
has elapsed or numberOfEvents
have been received. It is possible to have an empty iterable if no events were received in that time frame. receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) does not have a parameter for maxWaitTime
, consequently, it can take a long time to return results if numberOfEvents
is too high and there is low traffic in that Event Hub.
The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".
Sample: Creating a synchronous consumer
The following code sample demonstrates the creation of the synchronous client EventHubConsumerClient. The fullyQualifiedNamespace
is the Event Hubs Namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The consumerGroup
is found by navigating to the Event Hub instance, and selecting "Consumer groups" under the "Entities" panel. The consumerGroup(String consumerGroup) is required for creating consumer clients.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();
Sample: Consuming events from a single partition
Events from a single partition can be consumed using receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) or receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime). The call to receiveFromPartition
completes and returns an IterableStream<T> when either the maximum number of events is received, or the timeout has elapsed. It is possible to have an empty iterable returned if there were no events received in that duration.
TokenCredential credential = new DefaultAzureCredentialBuilder().build();
// "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
// "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
EventHubConsumerClient consumer = new EventHubClientBuilder()
.credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
credential)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildConsumerClient();
Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
String partitionId = "0";
// Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
startingPosition, Duration.ofSeconds(30));
Long lastSequenceNumber = -1L;
for (PartitionEvent partitionEvent : events) {
// For each event, perform some sort of processing.
System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
}
// Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
// If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
// partition.
if (lastSequenceNumber != -1L) {
EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);
// Gets the next set of events from partition '0' to consume and process.
IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
nextPosition, Duration.ofSeconds(30));
}
Method Summary
Modifier and Type | Method and Description |
---|---|
void | close() |
String |
getConsumerGroup()
Gets the consumer group this consumer is reading events as a part of. |
String |
getEventHubName()
Gets the Event Hub name this client interacts with. |
Event |
getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers. |
String |
getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with. |
String |
getIdentifier()
Gets the client identifier. |
Iterable |
getPartitionIds()
Retrieves the identifiers for the partitions of an Event Hub. |
Partition |
getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream. |
Iterable |
receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition)
Receives a batch of PartitionEvent from the Event Hub partition. |
Iterable |
receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime)
Receives a batch of PartitionEvent from the Event Hub partition. |
Iterable |
receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions)
Receives a batch of PartitionEvent from the Event Hub partition. |
Methods inherited from java.lang.Object
Method Details
close
public void close()
getConsumerGroup
public String getConsumerGroup()
Gets the consumer group this consumer is reading events as a part of.
Returns:
getEventHubName
public String getEventHubName()
Gets the Event Hub name this client interacts with.
Returns:
getEventHubProperties
public EventHubProperties getEventHubProperties()
Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
Returns:
getFullyQualifiedNamespace
public String getFullyQualifiedNamespace()
Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net
.
Returns:
getIdentifier
public String getIdentifier()
Gets the client identifier.
Returns:
getPartitionIds
public IterableStream
Retrieves the identifiers for the partitions of an Event Hub.
Returns:
getPartitionProperties
public PartitionProperties getPartitionProperties(String partitionId)
Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.
Parameters:
Returns:
receiveFromPartition
public IterableStream
Receives a batch of PartitionEvent from the Event Hub partition.
Parameters:
Returns:
maximumMessageCount
events. If a stream for the events was opened before, the same position within
that partition is returned. Otherwise, events are read starting from startingPosition
.receiveFromPartition
public IterableStream
Receives a batch of PartitionEvent from the Event Hub partition.
Parameters:
Returns:
maximumMessageCount
events.receiveFromPartition
public IterableStream
Receives a batch of PartitionEvent from the Event Hub partition.
Parameters:
Returns:
maximumMessageCount
events.Applies to
Azure SDK for Java