Compartilhar via


Ingerir dados do Apache Kafka no Azure Data Explorer

O Apache Kafka é uma plataforma de streaming distribuída para a criação de pipelines de dados de fluxo em tempo real, os quais movem os dados entre sistemas ou aplicativos de modo confiável. O Kafka Connect é uma ferramenta para streaming de dados escalonável e confiável entre o Apache Kafka e outros sistemas. O Coletor Kusto Kafka serve como o conector do Kafka e não requer o uso de código. Faça download do jar do conector do coletor do Repositório git ou Hub do Conector Confluent.

Este artigo mostra como ingerir dados com o Kafka, usando uma configuração autônoma do Docker para simplificar o cluster Kafka e a instalação do cluster do conector Kafka.

Para obter mais informações, consulte o Repositório git do conector e as especificações de versão.

Pré-requisitos

Crie uma entidade de serviço do Microsoft Entra.

A entidade de serviço Microsoft Entra pode ser criada por meio do portal do Azure ou programaticamente, como no exemplo a seguir.

Essa entidade de serviço é a identidade utilizada pelo conector para gravar dados na tabela do Kusto. Posteriormente, você vai conceder para essa entidade de serviço permissões de acesso aos recursos do Kusto.

  1. Inicie sessão na sua assinatura do Azure com a CLI do Azure. Em seguida, autentique no navegador.

    az login
    
  2. Escolha a assinatura para hospedar a entidade de segurança. Essa etapa é necessária quando você tem várias assinaturas.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Crie a entidade de serviço. Neste exemplo, a entidade de serviço é chamada my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. A partir dos dados JSON retornados, copie o appId, password e tenant para uso futuro.

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

Você criou o aplicativo do Microsoft Entra e a entidade de serviço.

Criar uma tabela de destino

  1. Do seu ambiente de consulta, crie uma tabela chamada Storms usando o seguinte comando:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Crie o mapeamento de tabela correspondente Storms_CSV_Mapping para dados ingeridos usando o seguinte comando:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Crie uma política de ingestão de lote na tabela para latência de ingestão configurável.

    Dica

    A política de envio em lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita dispara a ingestão na tabela.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Use a entidade de serviço de Criar uma entidade de serviço Microsoft Entra para conceder permissão para trabalhar com o banco de dados.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Executar o laboratório

O laboratório a seguir foi projetado para oferecer a você a experiência criar dados, configurar o conector do Kafka e transmitir esses dados. Em seguida, você pode examinar os dados ingeridos.

Definir o repositório Git

Clone o repositório git do laboratório.

  1. Crie um diretório local em seu computador.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Clonar o repositório.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Conteúdo do repositório clonado

Execute o seguinte comando para listar o conteúdo do repositório clonado:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Esse resultado dessa pesquisa é:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Examinar os arquivos no repositório clonado

As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos.

adx-sink-config.json

Esse arquivo contém o arquivo de propriedades do coletor Kusto em que você atualiza detalhes de configuração específicos:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Substitua os valores dos seguintes atributos de acordo com a configuração: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (o nome do banco de dados), kusto.ingestion.url e kusto.query.url.

Conector – Dockerfile

Esse arquivo tem os comandos para gerar a imagem do Docker para a instância do conector. Ele inclui o download do conector do diretório de liberação do repositório git.

Diretório Storm-events-producer

Esse diretório tem um programa Go que lê um arquivo local "StormEvents.csv" e publica os dados em um tópico Kafka.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Iniciar os contêineres

  1. Em um terminal, inicie os contêineres:

    docker-compose up
    

    O aplicativo produtor começa a enviar eventos para o tópico storm-events. Você deverá ver registros semelhantes aos seguintes:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Para verificar os registros, execute o seguinte comando em um terminal separado:

    docker-compose logs -f | grep kusto-connect
    

Iniciar o conector

Use uma chamada Kafka Connect REST para iniciar o conector.

  1. Em um terminal separado, inicie a tarefa do coletor com o seguinte comando:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Para verificar o status, execute o seguinte comando em um terminal separado:

    curl http://localhost:8083/connectors/storm/status
    

O conector inicia os processos de ingestão na fila.

Observação

Se você tiver problemas com o conector de registros, crie um problema.

Identidade gerenciada

Por padrão, o conector Kafka usa o método de aplicativo para autenticação durante a ingestão. Para autenticar usando a identidade gerenciada:

  1. Atribua ao cluster uma identidade gerenciada e conceda permissões de leitura à sua conta de armazenamento. Para obter mais informações, consulte Ingerir dados usando a autenticação de identidade gerenciada.

  2. No arquivo adx-sink-config.json, defina aad.auth.strategy e managed_identity certifique-se de que aad.auth.appid esteja definido como o ID do cliente de identidade gerenciada (aplicativo).

  3. Use um token de serviço de metadados de instância privada em vez da entidade de serviço do Microsoft Entra.

Observação

Ao usar uma identidade gerenciada, appId e tenant são deduzidos do contexto do site de chamada e password não são necessários.

Consultar e examinar dados

Confirmar ingestão de dados

  1. Assim que os dados chegarem à Storms tabela, verifique a contagem de linhas e confirme a transferência de dados:

    Storms 
    | count
    
  2. Confirme se não há falhas no processo de ingestão:

    .show ingestion failures
    

    Depois de ver os dados, experimente algumas consultas.

Consultar os dados

  1. Para ver todos os registros, execute a seguinte consulta:

    Storms
    | take 10
    
  2. Use where e project para filtrar dados específicos:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. Use o operador summarize:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Captura de tela dos resultados do gráfico de colunas de consulta Kafka conectados.

Para obter mais exemplos de consulta e diretrizes, consulte Gravar consultas no KQL e a Documentação da linguagem de consulta Kusto.

Reset

Para reiniciar, execute as etapas a seguir:

  1. Parar os contêineres (docker-compose down -v)
  2. Excluir (drop table Storms)
  3. Recriar a tabela Storms
  4. Recriar mapeamento de tabela
  5. Reiniciar os contêineres (docker-compose up)

Limpar os recursos

Para excluir os recursos do Azure Data Explorer, use az kusto cluster delete (extensão kusto) ou az kusto database delete (extensão kusto):

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

Você também pode excluir o cluster e o banco de dados por meio do portal do Azure. Para obter mais informações, consulte Excluir um cluster do Azure Data Explorer e Excluir um banco de dados no Azure Data Explorer.

Ajustando o conector do coletor Kafka

Ajuste o conector do coletor Kafka para trabalhar com a política de envio em lote de ingestão:

  • Ajuste o limite de tamanho de flush.size.bytes do coletor Kafka começando em 1 MB, aumentando em incrementos de 10 MB ou 100 MB.
  • Ao usar o coletor Kafka, os dados são agregados duas vezes. Os dados do lado do conector são agregados conforme as configurações de liberação e, no lado do serviço de Azure Data Explorer, conforme a política de envio em lote. Quando o tempo de envio em lote é muito curto e nenhum dado é ingerido pelo conector e pelo serviço, o tempo de envio em lote deve ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, quando o tamanho da liberação é 1 MB e o tamanho da política de envio em lote é 100 MB, o conector do coletor do Kafka agrega dados em um lote de 100 MB. Então, esse lote é ingerido pelo serviço. Quando o tempo de política de envio em lote é de 20 segundos e o conector do coletor Kafka libera 50 MB em um período de 20 segundos, o serviço ingere um lote de 50 MB.
  • Você pode dimensionar adicionando instâncias e partições Kafka. Aumente tasks.max para o número de partições. Crie uma partição se você tiver dados suficientes para produzir um blob do tamanho da configuração flush.size.bytes. Se o blob é menor, o lote é processado quando atingir o limite de tempo. Portanto, a partição não recebe taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.