Apache Kafka 用の Azure Event Hubs での Apache Flink の使用
このチュートリアルでは、プロトコル クライアントを変更したり、独自のクラスターを実行したりせずに、Apache Flink をイベント ハブに接続する方法について説明します。 Apache Kafka コンシューマー プロトコルの Event Hubs サポートの詳細については、Apache Kafka 用の Event Hubs に関するページを参照してください。
このチュートリアルでは、以下の内容を学習します。
- Event Hubs 名前空間を作成します
- サンプル プロジェクトを複製する
- Flink プロデューサーを実行する
- Flink コンシューマーを実行する
Note
このサンプルは GitHub で入手できます。
前提条件
このチュートリアルを完了するには、次の前提条件を満たしている必要があります。
- Apache Kafka 用の Event Hubs に関する記事を読む。
- Azure サブスクリプション。 お持ちでない場合は、開始する前に無料アカウントを作成してください。
- Java Development Kit (JDK) 1.7 以降
- Ubuntu で
apt-get install default-jdk
を実行して JDK をインストールします。 - 必ず、JDK のインストール先フォルダーを指すように JAVA_HOME 環境変数を設定してください。
- Ubuntu で
- Maven バイナリ アーカイブのダウンロードとインストール
- Ubuntu で
apt-get install maven
を実行して Maven をインストールします。
- Ubuntu で
- Git
- Ubuntu で
sudo apt-get install git
を実行して Git をインストールします。
- Ubuntu で
Event Hubs 名前空間を作成します
Event Hubs サービスとの間で送受信を行うには、Event Hubs 名前空間が必要です。 名前空間とイベント ハブを作成する手順については、イベント ハブの作成に関するページを参照してください。 後で使うので、イベント ハブの接続文字列をコピーしておきます。
サンプル プロジェクトを複製する
Event Hubs の接続文字列を入手したので、Kafka 用 Azure Event Hubs リポジトリをクローンし、flink
サブフォルダーに移動します。
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
Flink プロデューサーを実行する
提供された Flink プロデューサーの例を使用して、Event Hubs サービスにメッセージを送信します。
Event Hubs Kafka エンドポイントを指定する
producer.config
producer/src/main/resources/producer.config
の bootstrap.servers
値と sasl.jaas.config
値を更新し、正しい認証を使用してプロデューサーを Event Hubs Kafka エンドポイントに転送します。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
などがあります。
コマンド ラインからプロデューサーを実行する
コマンドラインからプロデューサーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
次にプロデューサーは、トピック test
にあるイベント ハブへのイベントの送信を開始し、それらのイベントを stdout に出力します。
Flink コンシューマーを実行する
提供されたコンシューマーの例を使用して、イベント ハブからメッセージを受信します。
Event Hubs Kafka エンドポイントを指定する
consumer.config
consumer/src/main/resources/consumer.config
の bootstrap.servers
値と sasl.jaas.config
値を更新し、正しい認証を使用してコンシューマーを Event Hubs Kafka エンドポイントに転送します。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
{YOUR.EVENTHUBS.CONNECTION.STRING}
を Event Hubs 名前空間への接続文字列に置き換えます。 接続文字列を取得する手順については、「Event Hubs の接続文字列の取得」を参照してください。 構成の例には、sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
などがあります。
コマンド ラインからコンシューマーを実行する
コマンドラインからコンシューマーを実行するには、JAR を生成し、Maven 内から実行します (または、Maven を使用して JAR を生成し、必要な Kafka JAR をクラスパスに追加することによって、Java 内で実行します)。
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
Event Hubs にイベントがある場合 (たとえば、プロデューサーも実行されている場合)、コンシューマーはトピック test
からのイベントの受信を開始します。
Flink を Kafka に接続する方法についての詳細は、Flink の Kafka コネクタ ガイドを調べてください。
次のステップ
Kafka 用 Event Hubs の詳細については、次の記事を参照してください。