EventHubConsumerClient Class

  • java.lang.Object
    • com.azure.messaging.eventhubs.EventHubConsumerClient

Implements

public class EventHubConsumerClient
implements Closeable

A synchronous consumer responsible for reading EventData from an Event Hub partition in the context of a specific consumer group.

Most receive operations contain a parameter maxWaitTime. The iterable is returned when either maxWaitTime has elapsed or numberOfEvents have been received. It is possible to have an empty iterable if no events were received in that time frame. receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) does not have a parameter for maxWaitTime, consequently, it can take a long time to return results if numberOfEvents is too high and there is low traffic in that Event Hub.

The examples shown in this document use a credential object named DefaultAzureCredential for authentication, which is appropriate for most scenarios, including local development and production environments. Additionally, we recommend using managed identity for authentication in production environments. You can find more information on different ways of authenticating and their corresponding credential types in the Azure Identity documentation".

Sample: Creating a synchronous consumer

The following code sample demonstrates the creation of the synchronous client EventHubConsumerClient. The fullyQualifiedNamespace is the Event Hubs Namespace's host name. It is listed under the "Essentials" panel after navigating to the Event Hubs Namespace via Azure Portal. The consumerGroup is found by navigating to the Event Hub instance, and selecting "Consumer groups" under the "Entities" panel. The consumerGroup(String consumerGroup) is required for creating consumer clients.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubConsumerClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildConsumerClient();

Sample: Consuming events from a single partition

Events from a single partition can be consumed using receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition) or receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime). The call to receiveFromPartition completes and returns an IterableStream<T> when either the maximum number of events is received, or the timeout has elapsed. It is possible to have an empty iterable returned if there were no events received in that duration.

TokenCredential credential = new DefaultAzureCredentialBuilder().build();

 // "<<fully-qualified-namespace>>" will look similar to "{your-namespace}.servicebus.windows.net"
 // "<<event-hub-name>>" will be the name of the Event Hub instance you created inside the Event Hubs namespace.
 EventHubConsumerClient consumer = new EventHubClientBuilder()
     .credential("<<fully-qualified-namespace>>", "<<event-hub-name>>",
         credential)
     .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
     .buildConsumerClient();

 Instant twelveHoursAgo = Instant.now().minus(Duration.ofHours(12));
 EventPosition startingPosition = EventPosition.fromEnqueuedTime(twelveHoursAgo);
 String partitionId = "0";

 // Reads events from partition '0' and returns the first 100 received or until the 30 seconds has elapsed.
 IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 100,
     startingPosition, Duration.ofSeconds(30));

 Long lastSequenceNumber = -1L;
 for (PartitionEvent partitionEvent : events) {
     // For each event, perform some sort of processing.
     System.out.print("Event received: " + partitionEvent.getData().getSequenceNumber());
     lastSequenceNumber = partitionEvent.getData().getSequenceNumber();
 }

 // Figure out what the next EventPosition to receive from is based on last event we processed in the stream.
 // If lastSequenceNumber is -1L, then we didn't see any events the first time we fetched events from the
 // partition.
 if (lastSequenceNumber != -1L) {
     EventPosition nextPosition = EventPosition.fromSequenceNumber(lastSequenceNumber, false);

     // Gets the next set of events from partition '0' to consume and process.
     IterableStream<PartitionEvent> nextEvents = consumer.receiveFromPartition(partitionId, 100,
         nextPosition, Duration.ofSeconds(30));
 }

Method Summary

Modifier and Type Method and Description
void close()
String getConsumerGroup()

Gets the consumer group this consumer is reading events as a part of.

String getEventHubName()

Gets the Event Hub name this client interacts with.

EventHubProperties getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

String getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with.

String getIdentifier()

Gets the client identifier.

IterableStream<String> getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

PartitionProperties getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition)

Receives a batch of PartitionEvent from the Event Hub partition.

IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime)

Receives a batch of PartitionEvent from the Event Hub partition.

IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions)

Receives a batch of PartitionEvent from the Event Hub partition.

Methods inherited from java.lang.Object

Method Details

close

public void close()

getConsumerGroup

public String getConsumerGroup()

Gets the consumer group this consumer is reading events as a part of.

Returns:

The consumer group this consumer is reading events as a part of.

getEventHubName

public String getEventHubName()

Gets the Event Hub name this client interacts with.

Returns:

The Event Hub name this client interacts with.

getEventHubProperties

public EventHubProperties getEventHubProperties()

Retrieves information about an Event Hub, including the number of partitions present and their identifiers.

Returns:

The set of information for the Event Hub that this client is associated with.

getFullyQualifiedNamespace

public String getFullyQualifiedNamespace()

Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to {yournamespace}.servicebus.windows.net.

Returns:

The fully qualified Event Hubs namespace that the connection is associated with.

getIdentifier

public String getIdentifier()

Gets the client identifier.

Returns:

The unique identifier string for current client.

getPartitionIds

public IterableStream getPartitionIds()

Retrieves the identifiers for the partitions of an Event Hub.

Returns:

The set of identifiers for the partitions of an Event Hub.

getPartitionProperties

public PartitionProperties getPartitionProperties(String partitionId)

Retrieves information about a specific partition for an Event Hub, including elements that describe the available events in the partition event stream.

Parameters:

partitionId - The unique identifier of a partition associated with the Event Hub.

Returns:

The set of information for the requested partition under the Event Hub this client is associated with.

receiveFromPartition

public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition)

Receives a batch of PartitionEvent from the Event Hub partition.

Parameters:

partitionId - Identifier of the partition to read events from.
maximumMessageCount - The maximum number of messages to receive in this batch.
startingPosition - Position within the Event Hub partition to begin consuming events.

Returns:

A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events. If a stream for the events was opened before, the same position within that partition is returned. Otherwise, events are read starting from startingPosition.

receiveFromPartition

public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime)

Receives a batch of PartitionEvent from the Event Hub partition.

Parameters:

partitionId - Identifier of the partition to read events from.
maximumMessageCount - The maximum number of messages to receive in this batch.
startingPosition - Position within the Event Hub partition to begin consuming events.
maximumWaitTime - The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.

Returns:

A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events.

receiveFromPartition

public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount, EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions)

Receives a batch of PartitionEvent from the Event Hub partition.

Parameters:

partitionId - Identifier of the partition to read events from.
maximumMessageCount - The maximum number of messages to receive in this batch.
startingPosition - Position within the Event Hub partition to begin consuming events.
maximumWaitTime - The maximum amount of time to wait to build up the requested message count for the batch; if not specified, the default wait time specified when the consumer was created will be used.
receiveOptions - Options when receiving events from the partition.

Returns:

A set of PartitionEvent that was received. The iterable contains up to maximumMessageCount events.

Applies to