다음을 통해 공유


HDInsight on AKS에서 Apache Flink®와 함께 HDInsight에서 Apache Kafka® 사용

참고 항목

2025년 1월 31일에 Azure HDInsight on AKS가 사용 중지됩니다. 2025년 1월 31일 이전에 워크로드가 갑자기 종료되지 않도록 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 마이그레이션해야 합니다. 구독의 나머지 클러스터는 호스트에서 중지되고 제거됩니다.

사용 중지 날짜까지 기본 지원만 사용할 수 있습니다.

Important

이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 AskHDInsight에서 세부 정보와 함께 요청을 제출하고 Azure HDInsight 커뮤니티에서 더 많은 업데이트를 확인하세요.

Apache Flink의 잘 알려진 사용 사례는 스트림 분석입니다. Apache Kafka를 사용하여 수집되는 데이터 스트림을 사용하기 위해 많은 사용자가 선택하는 인기 있는 선택입니다. Flink와 Kafka의 일반적인 설치는 이벤트 스트림이 Kafka로 푸시되는 것으로 시작되며 이는 Flink 작업에서 사용될 수 있습니다.

이 예제에서는 Flink 1.17.0을 실행하는 HDInsight on AKS 클러스터를 사용하여 Kafka 항목을 사용하고 생성하는 스트리밍 데이터를 처리합니다.

참고 항목

FlinkKafkaConsumer는 사용 중지되었으며 Flink 1.17에서 제거됩니다. 대신 KafkaSource를 사용하세요. FlinkKafkaProducer는 사용 중지되었으며 Flink 1.15에서 제거됩니다. 대신 KafkaSink를 사용하세요.

필수 조건

  • Kafka와 Flink는 모두 동일한 VNet에 있거나 두 클러스터 간에 vnet 피어링이 있어야 합니다.

  • VNet 만들기

  • 동일한 VNet에 Kafka 클러스터를 만듭니다. 현재 사용량에 따라 HDInsight에서 Kafka 3.2 또는 2.4를 선택할 수 있습니다.

    동일한 VNet에서 Kafka 클러스터를 만드는 방법을 보여 주는 스크린샷

  • 가상 네트워크 섹션에서 VNet 세부 정보를 추가합니다.

  • 동일한 VNet을 사용하여 HDInsight on AKS 클러스터 풀을 만듭니다.

  • 만든 클러스터 풀에 대한 Flink 클러스터를 만듭니다.

Apache Kafka 커넥터

Flink는 정확히 한 번의 보장으로 Kafka 항목에서 데이터를 읽고 Kafka 항목에 쓸 수 있는 Apache Kafka 커넥터를 제공합니다.

Maven 종속성

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Kafka 싱크 빌드

Kafka 싱크는 KafkaSink의 인스턴스를 구성하는 작성기 클래스를 제공합니다. 동일한 방법을 사용하여 싱크를 구성하고 HDInsight on AKS에서 실행되는 Flink 클러스터와 함께 사용합니다.

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Java 프로그램 Event.java 작성

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

Webssh에서 jar를 업로드하고 jar를 제출합니다.

Flink에서 실행되는 작업을 보여 주는 스크린샷

Flink 대시보드 UI에서

Kafka 토픽 패키지 jar를 작업으로 Flink에 제출하는 방법을 보여 주는 스크린샷

항목 생성 - Kafka 클릭

Kafka 토픽을 생성하는 방법을 보여 주는 스크린샷

항목 사용 - Kafka의 이벤트

Kafka 토픽을 사용하는 방법을 보여 주는 스크린샷

참조