Azure Event Hubs Checkpoint Store-Clientbibliothek für Java– Version 1.0.0-beta.2
Verwenden der Jedis-Clientbibliothek für Redis
Azure Event Hubs Prüfpunktspeicher kann zum Speichern von Prüfpunkten verwendet werden, während Ereignisse von Azure Event Hubs verarbeitet werden.
Dieses Paket verwendet Redis als beständigen Speicher zum Verwalten von Prüfpunkten und Partitionsbesitzinformationen.
Der JedisRedisCheckpointStore
in diesem Paket bereitgestellte kann an EventProcessorClient
angeschlossen werden.
Quellcode| API-Referenzdokumentation | Produktdokumentation | Beispiele
Erste Schritte
Voraussetzungen
- Java Development Kit (JDK), Version 8 oder höher.
- Maven
- Microsoft Azure-Abonnement
- Sie können ein kostenloses Konto erstellen unter: https://azure.microsoft.com
- Azure Event Hubs instance
- Schritt-für-Schritt-Anleitung zum Erstellen eines Event Hubs über das Azure-Portal
- Azure Redis Cache oder ein geeigneter alternativer Redis-Server
- Schritt-für-Schritt-Anleitung zum Erstellen eines Redis Cache mithilfe des Azure-Portals
Einschließen des Pakets
BOM-Datei einfügen
Fügen Sie azure-sdk-bom in Ihr Projekt ein, um von der Allgemeinverfügbarkeitsversion der Bibliothek abhängig zu sein. Ersetzen Sie im folgenden Codeausschnitt den Platzhalter {bom_version_to_target} durch die Versionsnummer. Weitere Informationen zur BOM finden Sie in der INFODATEI FÜR AZURE SDK-STÜCKLISTEN.
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>{bom_version_to_target}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
und fügen Sie dann die direkte Abhängigkeit wie unten dargestellt ohne das Versionstag in den Abschnitt abhängigkeiten ein.
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
</dependency>
</dependencies>
Direkte Abhängigkeiten einfügen
Wenn Sie eine Abhängigkeit von einer bestimmten Version der Bibliothek annehmen möchten, die nicht in der BoM vorhanden ist, fügen Sie die direkte Abhängigkeit wie folgt zu Ihrem Projekt hinzu.
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
<version>1.0.0-beta.2</version>
</dependency>
Authentifizieren des Speichercontainerclients
Um eine instance von JedisCheckpointStore
zu erstellen, muss ein JedisPool
-Objekt erstellt werden. Um dieses JedisPool
Objekt zu erstellen, sind ein Hostname String und ein Primärschlüssel String erforderlich. Diese können wie unten gezeigt verwendet werden, um ein JedisPool
-Objekt zu erstellen.
Wichtige Begriffe
Wichtige Konzepte werden hier ausführlich erläutert.
Beispiele
- Erstellen und Ausführen einer instance von JedisRedisCheckpointStore
- Nutzen von Ereignissen aus allen Event Hub-Partitionen
Erstellen einer instance von JedisPool
Um eine instance von JedisPool mit Azure Redis Cache zu erstellen, befolgen Sie die Anweisungen unter Verwenden von Azure Cache for Redis in Java zum Abrufen des Hostnamens und Zugriffsschlüssels. Verwenden Sie andernfalls Verbindungsinformationen aus einer ausgeführten Redis-instance.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
// Do things with JedisPool.
// Finally, dispose of resource
jedisPool.close();
Nutzen von Ereignissen mithilfe eines Ereignisprozessorclients
Um Ereignisse für alle Partitionen eines Event Hubs zu nutzen, erstellen Sie eine EventProcessorClient
für eine bestimmte Consumergruppe. Wenn ein Event Hub erstellt wird, wird eine Standardconsumergruppe bereitgestellt, die für die ersten Schritte verwendet werden kann.
Die EventProcessorClient
delegieren die Verarbeitung von Ereignissen an eine von Ihnen bereitgestellte Rückruffunktion, sodass Sie sich auf die Logik konzentrieren können, die zum Bereitstellen von Werten erforderlich ist, während der Prozessor für die Verwaltung der zugrunde liegenden Consumervorgänge verantwortlich ist.
In unserem Beispiel konzentrieren wir uns auf das Erstellen von EventProcessor
, verwenden JedisRedisCheckpointStore
sie und eine einfache Rückruffunktion, um die von den Event Hubs empfangenen Ereignisse zu verarbeiten, in die Konsole zu schreiben und den Prüfpunkt im Blobspeicher nach jedem Ereignis zu aktualisieren.
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
.password("<YOUR_REDIS_PRIMARY_ACCESS_KEY>")
.ssl(true)
.build();
String redisHostName = "<YOUR_REDIS_HOST_NAME>.redis.cache.windows.net";
HostAndPort hostAndPort = new HostAndPort(redisHostName, 6380);
JedisPool jedisPool = new JedisPool(hostAndPort, clientConfig);
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.consumerGroup("<< CONSUMER GROUP NAME >>")
.connectionString("<< EVENT HUB NAMESPACE CONNECTION STRING >>")
.eventHubName("<< EVENT HUB NAME >>")
.checkpointStore(new JedisCheckpointStore(jedisPool))
.processEvent(eventContext -> {
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
})
.processError(context -> {
System.out.println("Error occurred while processing events " + context.getThrowable().getMessage());
})
.buildEventProcessorClient();
// This will start the processor. It will start processing events from all partitions.
eventProcessorClient.start();
// (for demo purposes only - adding sleep to wait for receiving events)
// Your application will probably keep the eventProcessorClient alive until the program ends.
TimeUnit.SECONDS.sleep(2);
// When the user wishes to stop processing events, they can call `stop()`.
eventProcessorClient.stop();
// Dispose of JedisPool resource.
jedisPool.close();
Problembehandlung
Aktivieren der Clientprotokollierung
Das Azure SDK für Java bietet einen konsistenten Protokollierungsverlauf, um die Behandlung von Anwendungsfehlern zu unterstützen und deren Lösung zu beschleunigen. Die erstellten Protokolle erfassen den Flow einer Anwendung, bevor sie den Endzustand erreichen. Dies trägt zur Ermittlung der Grundursache bei. Informationen zum Aktivieren der Protokollierung finden Sie im Protokollierungswiki.
Nächste Schritte
Beginnen Sie, indem Sie sich die Beispiele hier ansehen.
Mitwirken
Wenn Sie ein aktiver Mitwirkender zu diesem Projekt werden möchten, lesen Sie bitte unsere Beitragsrichtlinien für weitere Informationen.
Azure SDK for Java