Freigeben über


Transaktionen in Apache Kafka für Azure Event Hubs

Dieser Artikel enthält ausführliche Informationen zur Verwendung der Transaktions-API in Apache Kafka mit Azure Event Hubs.

Übersicht

Event Hubs stellt einen Kafka-Endpunkt bereit, der von Ihren vorhandenen Kafka-Clientanwendungen als Alternative zum Betreiben eines eigenen Kafka-Clusters verwendet werden kann. Event Hubs funktioniert mit vielen Ihrer vorhandenen Kafka-Anwendungen. Weitere Informationen finden Sie unter Event Hubs für Apache Kafka.

Dieses Dokument konzentriert sich auf die nahtlose Verwendung der Kafka-Transaktions-API mit Azure Event Hubs.

Hinweis

Kafka Transactions befindet sich derzeit in der öffentlichen Vorschau der Ebenen Premium und Dedicated.

Transaktionen in Apache Kafka

In nativen Cloudumgebungen müssen Anwendungen für Netzwerkunterbrechungen sowie Neustarts und Upgrades von Namespaces resilient gemacht werden. Anwendungen, die strenge Verarbeitungsgarantien erfordern, müssen ein Transaktionsframework oder eine Transaktions-API verwenden, um sicherzustellen, dass entweder alle Vorgänge ausgeführt werden, oder keine, damit die Anwendung und der Datenstatus zuverlässig verwaltet werden. Wenn die Vorgänge fehlschlagen, können sie zuverlässig erneut automatisch versucht werden, um die richtigen Verarbeitungsgarantien sicherzustellen.

Hinweis

Transaktionsgarantien sind in der Regel erforderlich, wenn es mehrere Vorgänge gibt, die nach dem Modus „alles oder nichts“ verarbeitet werden müssen.

Für alle anderen Vorgänge sind Clientanwendungen standardmäßig resilient, sodass der Vorgang mit einem exponentiellen Backoff erneut versucht wird, wenn der jeweilige Vorgang fehlgeschlagen ist.

Apache Kafka stellt eine Transaktions-API bereit, um diese Verarbeitungsebene für die gleichen oder verschiedene Themen/Partitionen sicherzustellen.

Transaktionen gelten für die folgenden Fälle:

  • Transaktionsproducer.
  • Exactly-Once-Verarbeitungssemantik.

Transaktionsproducer

Transaktionsproducer stellen sicher, dass Daten automatisch in mehrere Partitionen in verschiedenen Themen geschrieben werden. Producer können eine Transaktion initiieren, in mehrere Partitionen im selben Thema oder in verschiedene Themen schreiben und dann die Transaktion committen oder abbrechen.

Um sicherzustellen, dass ein Producer transaktional ist, sollte enable.idempotence auf „true“" festgelegt werden, um sicherzustellen, dass die Daten genau einmal geschrieben werden, sodass Duplikate auf der Sendeseite vermieden werden. Darüber hinaus sollte transaction.id festgelegt werden, um den Producer eindeutig zu identifizieren.

    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Sobald der Producer initialisiert wurde, stellt der folgende Aufruf sicher, dass sich der Producer beim Broker als Transaktionsproducer registriert:

    producer.initTransactions();

Der Producer muss dann eine Transaktion explizit starten, Sendevorgänge in verschiedenen Themen und Partitionen wie üblich ausführen und dann die Transaktion mit dem folgenden Aufruf committen:

    producer.beginTransaction();
	/*
        Send to multiple topic partitions.
    */
    producer.commitTransaction();

Wenn die Transaktion aufgrund eines Fehlers oder eines Timeouts abgebrochen werden muss, kann der Producer die Methode abortTransaction() aufrufen.

	producer.abortTransaction();

Exactly-Once-Semantik

Die Exactly-Once-Semantik basiert auf den Transaktionsproducern, indem Consumer zum Transaktionsumfang der Producer hinzugefügt werden, sodass jeder Datensatz garantiert genau einmal gelesen, verarbeitet und geschrieben wird.

Zuerst wird der Transaktionsproducer instanziiert:


    producerProps.put("enable.idempotence", "true");
    producerProps.put("transactional.id", "transactional-producer-1");
    KafkaProducer<K, V> producer = new KafkaProducer(producerProps);

    producer.initTransactions();

Anschließend muss der Consumer so konfiguriert werden, dass nur Nicht-Transaktionsnachrichten oder Transaktionsnachrichten mit Commit gelesen werden, indem die folgende Eigenschaft festgelegt wird:


	consumerProps.put("isolation.level", "read_committed");
	KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);

Sobald der Consumer instanziiert wurde, kann er das Thema abonnieren, in dem die Datensätze gelesen werden müssen:


    consumer.subscribe(singleton("inputTopic"));

Nachdem der Consumer die Datensätze aus dem Eingabethema abgefragt hat, startet der Producer den Transaktionsbereich, in dem der Datensatz verarbeitet und in das Ausgabethema geschrieben wird. Sobald die Datensätze geschrieben wurden, wird die aktualisierte Zuordnung von Offsets für alle Partitionen erstellt. Der Producer sendet dann diese aktualisierte Offsetzuordnung an die Transaktion, bevor ein Commit für die Transaktion ausgeführt wird.

Bei einer Ausnahme wird die Transaktion abgebrochen, und der Producer wiederholt die Verarbeitung erneut automatisch.

	while (true) {
		ConsumerRecords records = consumer.poll(Long.Max_VALUE);
		producer.beginTransaction();
        try {
    		for (ConsumerRecord record : records) {
    			/*
                    Process record as appropriate
                */
                // Write to output topic
    	        producer.send(producerRecord(“outputTopic”, record));
    		}
    
            /*
                Generate the offset map to be committed.
            */
            Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
            for (TopicPartition partition : records.partitions()) {
                // Calculate the offset to commit and populate the map.
                offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
            }
            
            // send offsets to transaction and then commit the transaction.
    		producer.sendOffsetsToTransaction(offsetsToCommit, group);
    		producer.commitTransaction();
        } catch (Exception e)
        {
            producer.abortTransaction();
        }
	}

Warnung

Wenn die Transaktion vor max.transaction.timeout.ms weder committet noch abgebrochen wird, wird die Transaktion von Event Hubs automatisch abgebrochen. Der Standardwert für max.transaction.timeout.ms ist von Event Hubs auf 15 Minuten festgelegt, der Producer kann ihn jedoch mit einem niedrigeren Wert überschreiben, indem die Eigenschaft transaction.timeout.ms in den Konfigurationseigenschaften des Producers festgelegt wird.

Migrationshandbuch

Wenn Sie über vorhandene Kafka-Anwendungen verfügen, die Sie mit Azure Event Hubs verwenden möchten, lesen Sie das Kafka-Migrationshandbuch für Azure Event Hubs, um schnell beginnen zu können.

Nächste Schritte

Weitere Informationen zu Event Hubs und Event Hubs für Kafka finden Sie in folgenden Artikeln: