Integración de la compatibilidad con Apache Kafka Connect en Azure Event Hubs (versión preliminar)
Apache Kafka Connect es un marco de trabajo para conectar e importar o exportar datos desde y hacia cualquier sistema externo, como MySQL, HDFS y el sistema de archivos mediante un clúster de Kafka. Este artículo le guiará por el uso del marco Kafka Connect con Event Hubs.
Este artículo le guía por la integración de Kafka Connect con un centro de eventos y la implementación de conectores básicos FileStreamSource
y FileStreamSink
. Aunque estos conectores no están pensados para su uso en la producción, muestran un escenario de un extremo a otro de Kafka Connect en el que Azure Event Hubs actúa como un agente de Kafka.
Nota:
Este ejemplo está disponible en GitHub.
Requisitos previos
Para completar este tutorial, asegúrese de cumplir estos requisitos previos:
- Suscripción de Azure. En caso de no tener ninguna, cree una cuenta gratuita.
- Git
- Linux/macOS
- Última versión de Kafka disponible en kafka.apache.org
- Lea el artículo de introducción Event Hubs para Apache Kafka.
Creación de un espacio de nombres de Event Hubs
Se requiere un espacio de nombres de Event Hubs para enviar y recibir de cualquier servicio de Event Hubs. Consulte Creación de un centro de eventos para obtener instrucciones sobre cómo crear un espacio de nombres y un centro de eventos. Obtenga la cadena de conexión de Event Hubs y el nombre de dominio completo (FQDN) para su uso posterior. Para obtener instrucciones, consulte Get an Event Hubs connection string (Obtención de una cadena de conexión de Event Hubs).
Clonación del proyecto de ejemplo
Clone el repositorio de Azure Event Hubs y vaya a la carpeta tutorials/connect:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
Configuración de Kafka Connect para Event Hubs
Es necesaria una reconfiguración mínima cuando se redirige el rendimiento de Kafka Connect desde Kafka a Event Hubs. En el siguiente ejemplo connect-distributed.properties
se muestra cómo configurar Connect para autenticar y comunicarse con el punto de conexión de Kafka en Event Hubs:
# e.g. namespace.servicebus.windows.net:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
# path to the libs directory within the Kafka release
plugin.path={KAFKA.DIRECTORY}/libs
Importante
Reemplace {YOUR.EVENTHUBS.CONNECTION.STRING}
por la cadena de conexión para el espacio de nombres de Event Hubs. Para obtener instrucciones sobre cómo obtener la cadena de conexión, consulte Obtención de una cadena de conexión de Event Hubs. A continuación se muestra un ejemplo de configuración: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Ejecución de Kafka Connect
En este paso, un trabajo de Kafka Connect se inicia localmente en modo distribuido, con Event Hubs para mantener el estado del clúster.
- Guarde el archivo
connect-distributed.properties
en el entorno local. Asegúrese de reemplazar todos los valores entre llaves. - Vaya a la ubicación de la versión de Kafka en la máquina.
- Ejecute
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
. La API REST del trabajo de Connect está lista para interactuar cuando vea'INFO Finished starting connectors and tasks'
.
Nota:
Kafka Connect utiliza Kafka AdminClient API para crear temas automáticamente con configuraciones recomendadas, incluida la compactación. Una rápida comprobación del espacio de nombres en Azure Portal revela que los temas internos del trabajo de Connect se han creado automáticamente.
Los temas internos de Kafka Connect deben usar compactación. El equipo de Event Hubs no es responsable de corregir configuraciones incorrectas si los temas internos de Connect no están configurados correctamente.
Creación de conectores
En esta sección se explica la puesta en marcha y los conectores FileStreamSource
y FileStreamSink
.
Cree un directorio para archivos de datos de entrada y salida.
mkdir ~/connect-quickstart
Cree dos archivos: un archivo con datos de inicialización de los que lee el conector
FileStreamSource
y otro en el que escribe nuestro conectorFileStreamSink
.seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
Cree un conector
FileStreamSource
. Asegúrese de reemplazar las llaves por la ruta de acceso del directorio particular.curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
Debería ver el centro de eventos
connect-quickstart
en la instancia de Event Hubs después de ejecutar el comando.Compruebe el estado del conector de origen.
curl -s http://localhost:8083/connectors/file-source/status
Si lo desea, puede usar Service Bus Explorer para comprobar que los eventos llegaron al tema
connect-quickstart
.Cree un conector FileStreamSink. Una vez más, asegúrese de reemplazar las llaves por la ruta de acceso del directorio particular.
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
Compruebe el estado del conector del receptor.
curl -s http://localhost:8083/connectors/file-sink/status
Compruebe que se han replicado los datos entre los archivos y que son idénticos en ambos archivos.
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
Limpieza
Kafka Connect crea temas del centro de eventos para almacenar las configuraciones, los desplazamientos y los estados que persisten incluso después de que el clúster de Connect se haya desactivado. A menos que se desee esta persistencia, se recomienda eliminar estos temas. También se recomienda eliminar los connect-quickstart
Event Hubs que se crearon durante este tutorial.
Contenido relacionado
Para obtener más información acerca de Event Hubs para Kafka, consulte los artículos siguientes: