Azure Event Hubs Checkpoint Store client library for Java - version 1.0.0-beta.3
using the Jedis Client Library for Redis
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
This package makes use of Redis as a persistent store for maintaining checkpoints and partition ownership information.
The JedisRedisCheckpointStore
provided in this package can be plugged in to EventProcessorClient
.
Source code| API reference documentation | Product documentation | Samples
Getting started
Prerequisites
- A Java Development Kit (JDK), version 8 or later.
- Maven
- Microsoft Azure subscription
- You can create a free account at: https://azure.microsoft.com
- Azure Event Hubs instance
- Step-by-step guide for creating an Event Hub using the Azure Portal
- Azure Redis Cache or a suitable alternative Redis server
- Step-by-step guide for creating a Redis Cache using the Azure Portal
Include the package
Include the BOM file
Please include the azure-sdk-bom to your project to take dependency on the General Availability (GA) version of the library. In the following snippet, replace the {bom_version_to_target} placeholder with the version number. To learn more about the BOM, see the AZURE SDK BOM README.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
and then include the direct dependency in the dependencies section without the version tag as shown below.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
</dependency>
</dependencies>
Include direct dependency
If you want to take dependency on a particular version of the library that is not present in the BOM, add the direct dependency to your project as follows.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
<version>1.0.0-beta.3</version>
</dependency>
Authenticate the storage container client
In order to create an instance of JedisCheckpointStore
, a JedisPool
object must be created. To make this JedisPool
object, a hostname String and a primary key String are required. These can be used as shown below to create a
JedisPool
object.
Key concepts
Key concepts are explained in detail here.
Examples
- Create and run an instance of JedisRedisCheckpointStore
- Consume events from all Event Hub partitions
Create an instance of JedisPool
To create an instance of JedisPool using Azure Redis Cache, follow the instructions in Use Azure Cache for Redis in Java to fetch the hostname and access key. Otherwise, use connection information from a running Redis instance.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
// Do things with JedisPool.
// Finally, dispose of resource
jedisPool.close();
Consume events using an Event Processor Client
To consume events for all partitions of an Event Hub, you'll create an
EventProcessorClient
for a specific consumer group. When an Event Hub is created, it
provides a default consumer group that can be used to get started.
The EventProcessorClient
will delegate processing of events to a callback function
that you provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility
for managing the underlying consumer operations.
In our example, we will focus on building the EventProcessor
, use the
JedisRedisCheckpointStore
, and a simple callback function to process the events
received from the Event Hubs, writes to console and updates the checkpoint in Blob storage after each event.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
.eventHubName("<< EVENT HUB NAME >>")
.checkpointStore(new JedisCheckpointStore(jedisPool))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(context -> {
System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
// Your application will probably keep the eventProcessorClient alive until the program ends.
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
// Dispose of JedisPool resource.
jedisPool.close();
Troubleshooting
Enable client logging
Azure SDK for Java offers a consistent logging story to help aid in troubleshooting application errors and expedite their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help locate the root issue. View the logging wiki for guidance about enabling logging.
Next steps
Get started by exploring the samples here.
Contributing
If you would like to become an active contributor to this project please refer to our Contribution Guidelines for more information.