Ingerir dados do Apache Kafka no Azure Data Explorer
O Apache Kafka é uma plataforma de streaming distribuída para a criação de pipelines de dados de fluxo em tempo real, os quais movem os dados entre sistemas ou aplicativos de modo confiável. O Kafka Connect é uma ferramenta para streaming de dados escalonável e confiável entre o Apache Kafka e outros sistemas. O Coletor Kusto Kafka serve como o conector do Kafka e não requer o uso de código. Faça download do jar do conector do coletor do Repositório git ou Hub do Conector Confluent.
Este artigo mostra como ingerir dados com o Kafka, usando uma configuração autônoma do Docker para simplificar o cluster Kafka e a instalação do cluster do conector Kafka.
Para obter mais informações, consulte o Repositório git do conector e as especificações de versão.
Pré-requisitos
- Uma assinatura do Azure. Criar uma conta gratuita do Azure.
- Um cluster e um banco de dados do Azure Data Explorer com as políticas de cache e retenção padrão.
- CLI do Azure.
- Docker e Docker Compose.
Crie uma entidade de serviço do Microsoft Entra.
A entidade de serviço Microsoft Entra pode ser criada por meio do portal do Azure ou programaticamente, como no exemplo a seguir.
Essa entidade de serviço é a identidade utilizada pelo conector para gravar dados na tabela do Kusto. Posteriormente, você vai conceder para essa entidade de serviço permissões de acesso aos recursos do Kusto.
Inicie sessão na sua assinatura do Azure com a CLI do Azure. Em seguida, autentique no navegador.
az login
Escolha a assinatura para hospedar a entidade de segurança. Essa etapa é necessária quando você tem várias assinaturas.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Crie a entidade de serviço. Neste exemplo, a entidade de serviço é chamada
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
A partir dos dados JSON retornados, copie o
appId
,password
etenant
para uso futuro.{ "appId": "00001111-aaaa-2222-bbbb-3333cccc4444", "displayName": "my-service-principal", "name": "my-service-principal", "password": "00001111-aaaa-2222-bbbb-3333cccc4444", "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444" }
Você criou o aplicativo do Microsoft Entra e a entidade de serviço.
Criar uma tabela de destino
Do seu ambiente de consulta, crie uma tabela chamada
Storms
usando o seguinte comando:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Crie o mapeamento de tabela correspondente
Storms_CSV_Mapping
para dados ingeridos usando o seguinte comando:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Crie uma política de ingestão de lote na tabela para latência de ingestão configurável.
Dica
A política de envio em lote de ingestão é um otimizador de desempenho e inclui três parâmetros. A primeira condição satisfeita dispara a ingestão na tabela.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Use a entidade de serviço de Criar uma entidade de serviço Microsoft Entra para conceder permissão para trabalhar com o banco de dados.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Executar o laboratório
O laboratório a seguir foi projetado para oferecer a você a experiência criar dados, configurar o conector do Kafka e transmitir esses dados. Em seguida, você pode examinar os dados ingeridos.
Definir o repositório Git
Clone o repositório git do laboratório.
Crie um diretório local em seu computador.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Clonar o repositório.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Conteúdo do repositório clonado
Execute o seguinte comando para listar o conteúdo do repositório clonado:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Esse resultado dessa pesquisa é:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Examinar os arquivos no repositório clonado
As seções a seguir explicam as partes importantes dos arquivos na árvore de arquivos.
adx-sink-config.json
Esse arquivo contém o arquivo de propriedades do coletor Kusto em que você atualiza detalhes de configuração específicos:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Substitua os valores dos seguintes atributos de acordo com a configuração: aad.auth.authority
, aad.auth.appid
, aad.auth.appkey
, kusto.tables.topics.mapping
(o nome do banco de dados), kusto.ingestion.url
e kusto.query.url
.
Conector – Dockerfile
Esse arquivo tem os comandos para gerar a imagem do Docker para a instância do conector. Ele inclui o download do conector do diretório de liberação do repositório git.
Diretório Storm-events-producer
Esse diretório tem um programa Go que lê um arquivo local "StormEvents.csv" e publica os dados em um tópico Kafka.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Iniciar os contêineres
Em um terminal, inicie os contêineres:
docker-compose up
O aplicativo produtor começa a enviar eventos para o tópico
storm-events
. Você deverá ver registros semelhantes aos seguintes:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Para verificar os registros, execute o seguinte comando em um terminal separado:
docker-compose logs -f | grep kusto-connect
Iniciar o conector
Use uma chamada Kafka Connect REST para iniciar o conector.
Em um terminal separado, inicie a tarefa do coletor com o seguinte comando:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Para verificar o status, execute o seguinte comando em um terminal separado:
curl http://localhost:8083/connectors/storm/status
O conector inicia os processos de ingestão na fila.
Observação
Se você tiver problemas com o conector de registros, crie um problema.
Identidade gerenciada
Por padrão, o conector Kafka usa o método de aplicativo para autenticação durante a ingestão. Para autenticar usando a identidade gerenciada:
Atribua ao cluster uma identidade gerenciada e conceda permissões de leitura à sua conta de armazenamento. Para obter mais informações, consulte Ingerir dados usando a autenticação de identidade gerenciada.
No arquivo adx-sink-config.json, defina
aad.auth.strategy
emanaged_identity
certifique-se de queaad.auth.appid
esteja definido como o ID do cliente de identidade gerenciada (aplicativo).Use um token de serviço de metadados de instância privada em vez da entidade de serviço do Microsoft Entra.
Observação
Ao usar uma identidade gerenciada, appId
e tenant
são deduzidos do contexto do site de chamada e password
não são necessários.
Consultar e examinar dados
Confirmar ingestão de dados
Assim que os dados chegarem à
Storms
tabela, verifique a contagem de linhas e confirme a transferência de dados:Storms | count
Confirme se não há falhas no processo de ingestão:
.show ingestion failures
Depois de ver os dados, experimente algumas consultas.
Consultar os dados
Para ver todos os registros, execute a seguinte consulta:
Storms | take 10
Use
where
eproject
para filtrar dados específicos:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
Use o operador
summarize
:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Para obter mais exemplos de consulta e diretrizes, consulte Gravar consultas no KQL e a Documentação da linguagem de consulta Kusto.
Reset
Para reiniciar, execute as etapas a seguir:
- Parar os contêineres (
docker-compose down -v
) - Excluir (
drop table Storms
) - Recriar a tabela
Storms
- Recriar mapeamento de tabela
- Reiniciar os contêineres (
docker-compose up
)
Limpar os recursos
Para excluir os recursos do Azure Data Explorer, use az kusto cluster delete (extensão kusto) ou az kusto database delete (extensão kusto):
az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"
Você também pode excluir o cluster e o banco de dados por meio do portal do Azure. Para obter mais informações, consulte Excluir um cluster do Azure Data Explorer e Excluir um banco de dados no Azure Data Explorer.
Ajustando o conector do coletor Kafka
Ajuste o conector do coletor Kafka para trabalhar com a política de envio em lote de ingestão:
- Ajuste o limite de tamanho de
flush.size.bytes
do coletor Kafka começando em 1 MB, aumentando em incrementos de 10 MB ou 100 MB. - Ao usar o coletor Kafka, os dados são agregados duas vezes. Os dados do lado do conector são agregados conforme as configurações de liberação e, no lado do serviço de Azure Data Explorer, conforme a política de envio em lote. Quando o tempo de envio em lote é muito curto e nenhum dado é ingerido pelo conector e pelo serviço, o tempo de envio em lote deve ser aumentado. Defina o tamanho do lote em 1 GB e aumente ou diminua em incrementos de 100 MB, conforme necessário. Por exemplo, quando o tamanho da liberação é 1 MB e o tamanho da política de envio em lote é 100 MB, o conector do coletor do Kafka agrega dados em um lote de 100 MB. Então, esse lote é ingerido pelo serviço. Quando o tempo de política de envio em lote é de 20 segundos e o conector do coletor Kafka libera 50 MB em um período de 20 segundos, o serviço ingere um lote de 50 MB.
- Você pode dimensionar adicionando instâncias e partições Kafka. Aumente
tasks.max
para o número de partições. Crie uma partição se você tiver dados suficientes para produzir um blob do tamanho da configuraçãoflush.size.bytes
. Se o blob é menor, o lote é processado quando atingir o limite de tempo. Portanto, a partição não recebe taxa de transferência suficiente. Um grande número de partições significa mais sobrecarga de processamento.
Conteúdo relacionado
- Saiba mais sobre Arquiteturas de big data.
- Saiba como ingerir dados de amostra formatados em JSON no Azure Data Explorer.
- Saiba mais com o Kafka Labs: