Integrare il supporto di Apache Kafka Connect in Hub eventi di Azure
Apache Kafka Connect è un framework che consente di connettere e importare/esportare dati da/a qualsiasi sistema esterno come MySQL, HDFS e il file system tramite un cluster Kafka. Questo articolo illustra in dettaglio come usare il framework di Kafka Connect con Hub eventi.
L’articolo mostra come integrare Kafka Connect con un hub eventi e come distribuire i connettori di base FileStreamSource
e FileStreamSink
. Questi connettori non sono destinati all'uso in ambiente di produzione, ma servono semplicemente a dimostrare uno scenario completo di Kafka Connect in cui il servizio Hub eventi di Azure funge da broker Kafka.
Nota
Questo esempio è disponibile in GitHub.
Prerequisiti
Per completare questa procedura dettagliata, verificare di disporre dei prerequisiti seguenti:
- Sottoscrizione di Azure. Se non se ne ha una, creare un account gratuito.
- Git
- Linux/macOS
- Versione più recente di Kafka disponibile da kafka.apache.org
- Leggere con attenzione l'articolo introduttivo Hub eventi per Apache Kafka
Creare uno spazio dei nomi di Hub eventi
Per l'invio e la ricezione da qualsiasi servizio Hub eventi è richiesto uno spazio dei nomi di Hub eventi. Vedere Creazione di un hub eventi per istruzioni su come creare uno spazio dei nomi e un hub eventi. Ottenere la stringa di connessione di Hub eventi e il nome di dominio completo (FQDN) da usare successivamente. Per istruzioni, vedere Ottenere una stringa di connessione ad Hub eventi.
Clonare il progetto di esempio
Clonare il repository di Hub eventi di Azure e passare alla sottocartella tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configurare Kafka Connect per Hub eventi
Quando si reindirizza la velocità effettiva di Kafka Connect da Kafka a Hub eventi, è necessaria una riconfigurazione minima. Il codice di esempio connect-distributed.properties
seguente illustra come configurare Connect per autenticare e comunicare con l'endpoint Kafka in Hub eventi:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Importante
Sostituire {YOUR.EVENTHUBS.CONNECTION.STRING}
con la stringa di connessione per lo spazio dei nomi di Hub eventi. Per istruzioni su come ottenere la stringa di connessione, vedere Ottenere una stringa di connessione ad Hub eventi. Ecco un esempio di configurazione: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Eseguire Kafka Connect
In questo passaggio un ruolo di lavoro Kafka Connect viene avviato localmente in modalità distribuita, usando Hub eventi per gestire lo stato del cluster.
- Salvare il file in locale
connect-distributed.properties
. Assicurarsi di sostituire tutti i valori racchiusi tra parentesi graffe. - Passare alla posizione della versione di Kafka nel computer.
- Eseguire
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. Quando viene visualizzato l'avviso'INFO Finished starting connectors and tasks'
, l'API REST del ruolo di lavoro Connect è pronta per l'interazione.
Nota
Kafka Connect usa l'API Kafka AdminClient per creare automaticamente argomenti con le configurazioni consigliate, inclusa la compattazione. Un rapido controllo dello spazio dei nomi nel portale di Azure conforma che gli argomenti interni del ruolo di lavoro Connect sono stati creati automaticamente.
Gli argomenti interni di Kafka Connect devono usare la compattazione. Il team di Hub eventi non è responsabile della correzione di configurazioni non corrette se gli argomenti interni di Kafka Connect non sono configurati correttamente.
Creare i connettori
Questa sezione illustra in modo dettagliato i connettori FileStreamSource
e FileStreamSink
.
Creare una directory per i file di dati di input e output.
mkdir ~/connect-quickstart
Creare due file: uno con i dati di inizializzazione che vengono letti dal connettore
FileStreamSource
e l'altro in cui il connettoreFileStreamSink
scrive.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Creare un connettore
FileStreamSource
. Assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Dopo l'esecuzione del comando si dovrebbe vedere il comando
connect-quickstart
dell'Hub eventi nell'istanza di Hub eventi.Controllare lo stato del connettore di origine.
curl -s http://localhost:8083/connectors/file-source/status
Facoltativamente, è possibile usare Service Bus Explorer per verificare che gli eventi abbiamo raggiunto l'argomento
connect-quickstart
.Creare un connettore FileStreamSink. Anche in questo caso, assicurarsi di sostituire i valori nelle parentesi graffe con il percorso della home directory.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Controllare lo stato del connettore sink.
curl -s http://localhost:8083/connectors/file-sink/status
Verificare che i dati siano stati replicati tra i file e che siano identici in entrambi i file.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Pulizia
Kafka Connect crea argomenti di Hub eventi in cui archiviare le configurazioni, gli offset e lo stato che rimangono persistenti anche dopo che il cluster Connect viene disattivato. A meno che non si desideri questa persistenza, è consigliabile eliminare questi argomenti. Si potrebbe anche desiderare di eliminare il codice connect-quickstart
di Hub eventi creato nel corso della procedura dettagliata.
Contenuto correlato
Per altre informazioni su Hub eventi di Azure per Kafka, vedere gli articoli seguenti: