Поделиться через


Использование Akka Streams с Центрами событий для Apache Kafka

В этом руководстве показано, как подключить потоки Akka через поддержку концентраторов событий для Apache Kafka без изменения клиентов протокола или запуска собственных кластеров.

В этом руководстве описано следующее:

  • Создание пространства имен в Центрах событий Azure
  • Клонирование примера проекта
  • Запуск производителя Akka Streams
  • Запуск потребителя Akka Streams

Примечание.

Этот пример можно найти на сайте GitHub.

Необходимые компоненты

Для работы с этим руководством выполните следующие предварительные требования:

  • Прочтите статью Центры событий Azure для Apache Kafka.
  • Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начать работу.
  • Комплект разработчика Java (JDK) 1.8+.
    • В Ubuntu выполните команду apt-get install default-jdk, чтобы установить JDK.
    • Обязательно настройте переменную среды JAVA_HOME так, чтобы она указывала на папку, в которой установлен пакет JDK.
  • Скачивание и установка двоичного архива Maven
    • В Ubuntu выполните команду apt-get install maven, чтобы установить Maven.
  • Git
    • В Ubuntu выполните команду sudo apt-get install git, чтобы установить Git.

Создание пространства имен в Центрах событий Azure

Для отправки и получения данных из любой службы концентраторов событий требуется пространство имен концентраторов событий. См. раздел Создание концентратора событий для получения подробной информации. Скопируйте строку подключения к Центрам событий для дальнейшего использования.

Клонирование примера проекта

Теперь, когда у вас есть строка подключения к Центрам событий, клонируйте репозиторий Центров событий Azure для Kafka и перейдите в нем к подпапке akka:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

Запуск производителя Akka Streams

Используя предоставленный пример производителя Akka Streams, отправьте сообщения в службу Центров событий.

Предоставление конечной точки Kafka в Центрах событий

application.conf производителя

Обновите значения bootstrap.servers и sasl.jaas.config в producer/src/main/resources/application.conf, чтобы перенаправить производителя на конечную точку Kafka Центров событий с правильной аутентификацией.

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

Внимание

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Запуск производителя из командной строки

Чтобы запустить производителя из командной строки, создайте JAR-файл, а затем запустите его из Maven (или создайте JAR-файл с помощью Maven, а затем запустите в Java, добавив необходимые файлы архива Kafka Java (JARs) в классpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

Производитель начинает отправлять события в концентратор событий в теме test и печатает события в стандартный вывод.

Запуск потребителя Akka Streams

Используя предоставленный пример потребителя, получите сообщения от концентратора событий.

Предоставление конечной точки Kafka в Центрах событий

application.conf потребителя

Обновите значения bootstrap.servers и sasl.jaas.config в consumer/src/main/resources/application.conf, чтобы перенаправить потребителя на конечную точку Kafka в Центрах событий с правильной аутентификацией.

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

Внимание

Замените {YOUR.EVENTHUBS.CONNECTION.STRING} строками подключения для вашего пространства имен Центров событий. Инструкции по получению строки подключения см. в статье Получение строки подключения Центров событий. Пример конфигурации см. здесь: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

Запуск потребителя из командной строки

Чтобы запустить потребитель из командной строки, создайте JAR-файл и запустите его из Maven (или создайте JAR-файл с помощью Maven, а затем запустите в Java, добавив необходимые JAR Kafka в классpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

Если в концентраторе событий есть события (например, если ваш продюсер также работает), то потребитель начинает получать события из темы test.

Дополнительные сведения об Akka Streams см. в руководстве по Akka Streams Kafka.

Следующие шаги

Дополнительные сведения о концентраторах событий для Kafka см. в следующих статьях: