共用方式為


將 Apache Kafka® on HDInsight 與 Apache Flink® on HDInsight on AKS 結合使用

注意

AKS 上的 Azure HDInsight 將於 2025 年 1 月 31 日退場。 請於 2025 年 1 月 31 日之前,將工作負載移轉至 Microsoft Fabric 或對等的 Azure 產品,以免工作負載突然終止。 訂用帳戶中剩餘的叢集將會停止,並會從主機移除。

在淘汰日期之前,只有基本支援可用。

重要

此功能目前為預覽功能。 Microsoft Azure 預覽版增補使用規定包含適用於 Azure 功能 (搶鮮版 (Beta)、預覽版,或尚未正式發行的版本) 的更多法律條款。 若需此特定預覽版的相關資訊,請參閱 Azure HDInsight on AKS 預覽版資訊。 如有問題或功能建議,請在 AskHDInsight 上提交要求並附上詳細資料,並且在 Azure HDInsight 社群上追蹤我們以獲得更多更新資訊。

Apache Flink 的已知使用案例是串流分析。 許多使用者使用資料流的熱門選擇,會使用 Apache Kafka 來擷取這些資料流。 Flink 和 Kafka 的一般安裝會從正在推送至 Kafka 的事件串流開始,然後由 Flink 作業使用。

此範例使用執行 Flink 1.17.0 的 HDInsight on AKS 叢集來處理取用及產生 Kafka 主題的串流資料。

注意

FlinkKafkaConsumer 已被取代,並將在 Flink 1.17 中移除,請改用 KafkaSource。 FlinkKafkaProducer 已被取代,並將在 Flink 1.15 中移除,請改用 KafkaSink。

必要條件

  • Kafka 和 Flink 需要位於同一個 VNet 中,或兩個叢集之間應該有 vnet 對等互連。

  • 建立 VNet

  • 在同一個 VNet 中建立 Kafka 叢集。 您可以根據目前的使用情況在 HDInsight 上選擇 Kafka 3.2 或 2.4。

    顯示如何在相同的 VNet 中建立 Kafka 叢集的螢幕擷取畫面。

  • 在虛擬網路區段中新增 VNet 詳細資料。

  • 在同一個 VNet 中建立 HDInsight on AKS 叢集集區

  • 對所建立的叢集集區中,建立 Flink 叢集。

Apache Kafka 連接器

Flink 提供了一個 Apache Kafka 連接器,用於從 Kafka 主題讀取資料,以及將資料寫入 Kafka 主題,並具有剛好一次的保證。

Maven 相依性

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

建置 Kafka Sink

Kafka Sink 提供了一個建置器類別來建構 KafkaSink 的執行個體。 我們使用相同的方法來建構我們的 Sink 並將其與在 HDInsight on AKS 上執行的 Flink 叢集一起使用

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

撰寫 Java 程式 Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

在 Webssh 上,上傳 jar 並加以提交

此螢幕擷取畫面顯示在 Flink 上執行的作業。

在 Flink 儀表板 UI 上

顯示如何將封裝了 jar 的 Kafka 主題以作業的形式提交至 Flink 的螢幕擷取畫面。

產生主題 - 按一下 Kafka

顯示如何產生 Kafka 主題的螢幕擷取畫面。

取用主題 - Kafka 上的事件

顯示如何取用 Kafka 主題的螢幕擷取畫面。

參考