Partilhar via


Usando o Apache Kafka® no HDInsight com o Apache Flink® no HDInsight no AKS

Nota

Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.

Apenas o apoio básico estará disponível até à data da reforma.

Importante

Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.

Um caso de uso bem conhecido para o Apache Flink é a análise de fluxo. A escolha popular por muitos usuários para usar os fluxos de dados, que são ingeridos usando Apache Kafka. As instalações típicas de Flink e Kafka começam com fluxos de eventos sendo empurrados para Kafka, que podem ser consumidos por trabalhos Flink.

Este exemplo usa o HDInsight em clusters AKS que executam o Flink 1.17.0 para processar o consumo de dados de streaming e a produção do tópico Kafka.

Nota

FlinkKafkaConsumer foi preterido e será removido com Flink 1.17, use KafkaSource em vez disso. FlinkKafkaProducer foi preterido e será removido com Flink 1.15, use KafkaSink em vez disso.

Pré-requisitos

  • Tanto Kafka quanto Flink precisam estar na mesma VNet ou deve haver vnet-peering entre os dois clusters.

  • Criação de VNet.

  • Crie um cluster Kafka na mesma rede virtual. Você pode escolher Kafka 3.2 ou 2.4 no HDInsight com base no seu uso atual.

    Captura de tela mostrando como criar um cluster Kafka na mesma rede virtual.

  • Adicione os detalhes da rede virtual na seção de rede virtual.

  • Crie um HDInsight no pool de clusters AKS com a mesma rede virtual.

  • Crie um cluster Flink para o pool de clusters criado.

Conector Apache Kafka

Flink fornece um Apache Kafka Connector para ler dados e gravar dados em tópicos Kafka com exatamente uma garantia.

Dependência do Maven

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

Edifício Kafka Sink

Kafka sink fornece uma classe builder para construir uma instância de um KafkaSink. Usamos o mesmo para construir nosso Sink e usá-lo junto com o cluster Flink em execução no HDInsight no AKS

SinKafkaToKafka.java

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;

import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class SinKafkaToKafka {
    public static void main(String[] args) throws Exception {
        // 1. get stream execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. read kafka message as stream input, update your broker IPs below
        String brokers = "X.X.X.X:9092,X.X.X.X:9092,X.X.X.X:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("clicks")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        
        // 3. transformation: 
        // https://www.taobao.com,1000 ---> 
        // Event{user: "Tim",url: "https://www.taobao.com",timestamp: 1970-01-01 00:00:01.0}
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 4. sink click into another kafka events topic
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setProperty("transaction.timeout.ms","900000")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("events")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .build();

        result.sinkTo(sink);

       // 5. execute the stream
        env.execute("kafka Sink to other topic");
    }
}

Escrevendo um programa Java Event.java

import java.sql.Timestamp;

public class Event {

    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user,String url,Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString(){
        return "Event{" +
                "user: \"" + user + "\""  +
                ",url: \"" + url + "\""  +
                ",timestamp: " + new Timestamp(timestamp) +
                "}";
    }
}

No Webssh, carregue o frasco e envie o frasco

Captura de tela mostrando o trabalho em execução no Flink.

Na interface do usuário do painel Flink

Captura de tela mostrando como enviar o jar empacotado do tópico Kafka como um trabalho para o Flink.

Produza o tópico - cliques em Kafka

Captura de tela mostrando como produzir o tópico Kafka.

Consuma o tema - eventos sobre Kafka

Captura de tela mostrando como consumir o tópico Kafka.

Referência