Freigeben über


Integrieren der Apache Kafka Connect-Unterstützung in Azure Event Hubs

Apache Kafka Connect ist ein Framework zum Verbinden und Importieren/Exportieren von Daten aus einem beliebigen externen System bzw. in ein beliebiges externes System wie MySQL, HDFS und ein Dateisystem über einen Kafka-Cluster. In diesem Artikel wird die Nutzung eines Kafka Connect-Frameworks mit Event Hubs Schritt für Schritt beschrieben.

Dieser Artikel führt Sie schrittweise durch die Vorgehensweise zum Integrieren von Kafka Connect in einen Event Hub und zum Bereitstellen von einfachen FileStreamSource- und FileStreamSink-Connectors. Die Connectors sind zwar nicht für die Verwendung in der Produktion bestimmt, aber sie stellen ein End-to-End-Szenario für Kafka Connect dar, bei dem Azure Event Hubs als Kafka-Broker fungiert.

Hinweis

Dieses Beispiel ist auf GitHub verfügbar.

Voraussetzungen

Stellen Sie für das Durcharbeiten dieses Tutorials zur Vorgehensweise sicher, dass die folgenden Voraussetzungen erfüllt sind:

Erstellen eines Event Hubs-Namespace

Ein Event Hubs-Namespace ist erforderlich, um Nachrichten an einen Event Hubs-Dienst zu senden und von diesem zu empfangen. Anweisungen zum Erstellen eines Namespace und eines Event Hub finden Sie unter Erstellen eines Event Hubs. Rufen Sie die Event Hubs-Verbindungszeichenfolge und den vollqualifizierten Domänennamen (Fully Qualified Domain Name, FQDN) zur späteren Verwendung ab. Anweisungen hierzu finden Sie unter Get an Event Hubs connection string (Abrufen einer Event Hubs-Verbindungszeichenfolge).

Klonen des Beispielprojekts

Klonen Sie das Azure Event Hubs-Repository, und navigieren Sie zum Unterordner „tutorials/connect“:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

Konfigurieren von Kafka Connect für Event Hubs

Es ist nur eine minimale Neukonfiguration erforderlich, wenn Sie den Kafka Connect-Durchsatz von Kafka an Event Hubs umleiten. Im folgenden Beispiel connect-distributed.properties ist dargestellt, wie Sie Connect konfigurieren, um die Authentifizierung und Kommunikation mit dem Kafka-Endpunkt unter Event Hubs einzurichten:

# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs 

Wichtig

Ersetzen Sie {YOUR.EVENTHUBS.CONNECTION.STRING} durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace. Anweisungen zum Abrufen der Verbindungszeichenfolge finden Sie unter Abrufen einer Event Hubs-Verbindungszeichenfolge. Hier sehen Sie eine Beispielkonfiguration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Ausführen von Kafka Connect

In diesem Schritt wird ein Kafka Connect-Worker lokal im verteilten Modus gestartet, indem Event Hubs zum Warten des Clusterzustands verwendet wird.

  1. Speichern Sie die Datei connect-distributed.properties lokal. Achten Sie darauf, alle Werte in geschweiften Klammern zu ersetzen.
  2. Navigieren Sie zum Speicherort des Kafka-Release auf Ihrem Computer.
  3. Führen Sie ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties aus. Die Connect-Worker-REST-API ist für die Interaktion bereit, wenn 'INFO Finished starting connectors and tasks' angezeigt wird.

Hinweis

Kafka Connect verwendet die Kafka-AdminClient-API, um automatisch Themen mit empfohlenen Konfigurationen, einschließlich Komprimierung, zu erstellen. Eine schnelle Überprüfung des Namespace im Azure-Portal ergibt, dass die internen Themen des Connect-Workers automatisch erstellt wurden.

Interne Kafka Connect-Themen müssen Komprimierung verwenden. Das Event Hubs-Team ist nicht für die Korrektur falscher Konfigurationen zuständig, sollten interne Connect-Themen nicht ordnungsgemäß konfiguriert sein.

Erstellen von Connectors

In diesem Abschnitt wird das Hochfahren von FileStreamSource- und FileStreamSink-Connectors erläutert.

  1. Erstellen Sie ein Verzeichnis für Ein- und Ausgabedatendateien.

    mkdir ~/connect-quickstart
    
  2. Erstellen Sie zwei Dateien: eine Datei mit Startdaten, aus der der FileStreamSource-Connector liest, und eine weitere Datei, in die der FileStreamSink-Connector schreibt.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. Erstellen Sie einen FileStreamSource-Connector. Achten Sie darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    In Ihrer Event Hubs-Instanz sollte nach dem Ausführen des Befehls der Event Hub connect-quickstart angezeigt werden.

  4. Überprüfen Sie den Status des Quellconnectors.

    curl -s http://localhost:8083/connectors/file-source/status
    

    Optional können Sie Service Bus Explorer nutzen, um zu überprüfen, ob Ereignisse im Thema connect-quickstart eingegangen sind.

  5. Erstellen Sie einen FileStreamSink-Connector. Achten Sie auch hier wieder darauf, dass Sie den Inhalt der geschweiften Klammern durch den Pfad Ihres Basisverzeichnisses ersetzen.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. Überprüfen Sie den Status des Senkenconnectors.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. Stellen Sie sicher, dass die Daten zwischen den Dateien repliziert wurden und in beiden Dateien identisch sind.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

Cleanup

Mit Kafka Connect werden Event Hub-Themen zum Speichern von Konfigurationen, Offsets und des Status erstellt, die auch dann beibehalten werden, nachdem der Connect-Cluster heruntergefahren wurde. Sofern diese Persistenz nicht gewünscht ist, empfehlen wir, diese Themen zu löschen. Sie können auch die Event Hubs-Instanzen vom Typ connect-quickstart löschen, die in dieser exemplarischen Vorgehensweise erstellt wurden.

Weitere Informationen zu Event Hubs für Kafka finden Sie in folgenden Artikeln: