將 Apache Kafka® on HDInsight 與 Apache Flink® on HDInsight on AKS 結合使用
注意
AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。
在淘汰日期之前,只有基本支援可用。
重要
此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。
Apache Flink 的已知使用案例是串流分析。 許多使用者使用資料流的熱門選擇,會使用 Apache Kafka 來擷取這些資料流。 Flink 和 Kafka 的一般安裝會從正在推送至 Kafka 的事件串流開始,然後由 Flink 作業使用。
此範例使用執行 Flink 1.17.0 的 HDInsight on AKS 叢集來處理取用及產生 Kafka 主題的串流資料。
注意
FlinkKafkaConsumer 已被取代,並將在 Flink 1.17 中移除,請改用 KafkaSource。 FlinkKafkaProducer 已被取代,並將在 Flink 1.15 中移除,請改用 KafkaSink。
必要條件
Kafka 和 Flink 需要位於同一個 VNet 中,或兩個叢集之間應該有 vnet 對等互連。
在同一個 VNet 中建立 Kafka 叢集。 您可以根據目前的使用情況在 HDInsight 上選擇 Kafka 3.2 或 2.4。
在虛擬網路區段中新增 VNet 詳細資料。
在同一個 VNet 中建立 HDInsight on AKS 叢集集區。
對所建立的叢集集區中,建立 Flink 叢集。
Apache Kafka 連接器
Flink 提供了一個 Apache Kafka 連接器,用於從 Kafka 主題讀取資料,以及將資料寫入 Kafka 主題,並具有剛好一次的保證。
Maven 相依性
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.0</version>
</dependency>
建置 Kafka Sink
Kafka Sink 提供了一個建置器類別來建構 KafkaSink 的執行個體。 我們使用相同的方法來建構我們的 Sink 並將其與在 HDInsight on AKS 上執行的 Flink 叢集一起使用
SinKafkaToKafka.java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinKafkaToKafka {
public static void main(String[] args) throws Exception {
// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. read kafka message as stream input, update your broker IPs below
String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("clicks")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 3. transformation:
// https://www.taobao.com,1000 --->
// Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
}
});
// 4. sink click into another kafka events topic
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setProperty("transaction.timeout.ms","900000")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("events")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
result.sinkTo(sink);
// 5. execute the stream
env.execute("kafka Sink to other topic");
}
}
撰寫 Java 程式 Event.java
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user,String url,Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString(){
return "Event{" +
"user: \"" + user + "\"" +
",url: \"" + url + "\"" +
",timestamp: " + new Timestamp(timestamp) +
"}";
}
}
封裝該 jar 檔並將作業提交至 Flink
在 Webssh 上,上傳 jar 並加以提交
在 Flink 儀表板 UI 上
產生主題 - 按一下 Kafka
取用主題 - Kafka 上的事件
參考
- Apache Kafka 連接器
- Apache、Apache Kafka、Kafka、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 Apache Software Foundation (ASF) 的商標。