Partilhar via


Configurar Hubs de Eventos do Azure e pontos de extremidade de fluxo de dados Kafka

Importante

Esta página inclui instruções para gerenciar componentes do Azure IoT Operations usando manifestos de implantação do Kubernetes, que está em visualização. Esse recurso é fornecido com várias limitações e não deve ser usado para cargas de trabalho de produção.

Veja Termos de Utilização Complementares da Pré-visualizações do Microsoft Azure para obter os termos legais que se aplicam às funcionalidades do Azure que estão na versão beta, na pré-visualização ou que ainda não foram lançadas para disponibilidade geral.

Para configurar a comunicação bidirecional entre as Operações IoT do Azure e os corretores Apache Kafka, você pode configurar um ponto de extremidade de fluxo de dados. Essa configuração permite especificar o ponto de extremidade, TLS (Transport Layer Security), autenticação e outras configurações.

Pré-requisitos

Hubs de Eventos do Azure

Os Hubs de Eventos do Azure são compatíveis com o protocolo Kafka e podem ser usados com fluxos de dados com algumas limitações.

Criar um namespace e um hub de eventos dos Hubs de Eventos do Azure

Primeiro, crie um namespace de Hubs de Eventos do Azure habilitado para Kafka

Em seguida, crie um hub de eventos no namespace. Cada centro de eventos individual corresponde a um tópico de Kafka. Você pode criar vários hubs de eventos no mesmo namespace para representar vários tópicos do Kafka.

Atribuir permissão à identidade gerenciada

Para configurar um ponto de extremidade de fluxo de dados para Hubs de Eventos do Azure, recomendamos usar uma identidade gerenciada atribuída pelo usuário ou pelo sistema. Essa abordagem é segura e elimina a necessidade de gerenciar credenciais manualmente.

Depois que o namespace e o hub de eventos dos Hubs de Eventos do Azure forem criados, você precisará atribuir uma função à identidade gerenciada das Operações IoT do Azure que conceda permissão para enviar ou receber mensagens para o hub de eventos.

Se estiver usando a identidade gerenciada atribuída ao sistema, no portal do Azure, vá para sua instância de Operações IoT do Azure e selecione Visão geral. Copie o nome da extensão listada após a extensão Azure IoT Operations Arc. Por exemplo, azure-iot-operations-xxxx7. Sua identidade gerenciada atribuída ao sistema pode ser encontrada usando o mesmo nome da extensão Azure IoT Operations Arc.

Em seguida, vá para o namespace >Hubs de Eventos Controle de acesso (IAM)>Adicionar atribuição de função.

  1. Na guia Função, selecione uma função apropriada como Azure Event Hubs Data Sender ou Azure Event Hubs Data Receiver. Isso dá à identidade gerenciada as permissões necessárias para enviar ou receber mensagens para todos os hubs de eventos no namespace. Para saber mais, consulte Autenticar um aplicativo com a ID do Microsoft Entra para acessar recursos de Hubs de Eventos.
  2. No separador Membros:
    1. Se estiver usando a identidade gerenciada atribuída pelo sistema, para Atribuir acesso a, selecione a opção Usuário, grupo ou entidade de serviço, selecione + Selecionar membros e procure o nome da extensão do Azure IoT Operations Arc.
    2. Se estiver usando a identidade gerenciada atribuída pelo usuário, para Atribuir acesso a, selecione a opção Identidade gerenciada e, em seguida, selecione + Selecionar membros e procure sua identidade gerenciada atribuída pelo usuário configurada para conexões de nuvem.

Criar ponto de extremidade de fluxo de dados para Hubs de Eventos do Azure

Depois que o namespace e o hub de eventos dos Hubs de Eventos do Azure estiverem configurados, você poderá criar um ponto de extremidade de fluxo de dados para o namespace dos Hubs de Eventos do Azure habilitado para Kafka.

  1. Na experiência de operações, selecione a guia Pontos de extremidade de fluxo de dados.

  2. Em Criar novo ponto de extremidade de fluxo de dados, selecione Hubs de>Eventos do Azure Novo.

    Captura de tela usando a experiência de operações para criar um ponto de extremidade de fluxo de dados dos Hubs de Eventos do Azure.

  3. Insira as seguintes configurações para o ponto de extremidade:

    Definição Description
    Name O nome do ponto de extremidade do fluxo de dados.
    Host O nome do host do corretor Kafka no formato <NAMESPACE>.servicebus.windows.net:9093. Inclua o número 9093 da porta na configuração do host para Hubs de Eventos.
    Método de autenticação O método usado para autenticação. Recomendamos que você escolha Identidade gerenciada atribuída ao sistema ou Identidade gerenciada atribuída pelo usuário.
  4. Selecione Aplicar para provisionar o ponto de extremidade.

Nota

O tópico Kafka, ou hub de eventos individual, é configurado posteriormente quando você cria o fluxo de dados. O tópico Kafka é o destino das mensagens de fluxo de dados.

Usar cadeia de conexão para autenticação em Hubs de Eventos

Importante

Para usar o portal de experiência de operações para gerenciar segredos, as Operações do Azure IoT devem primeiro ser habilitadas com configurações seguras, configurando um Cofre de Chaves do Azure e habilitando identidades de carga de trabalho. Para saber mais, consulte Habilitar configurações seguras na implantação do Azure IoT Operations.

Na página Configurações do ponto de extremidade de fluxo de dados da experiência de operações, selecione a guia Básico e escolha Método de autenticação>SASL.

Insira as seguintes configurações para o ponto de extremidade:

Definição Descrição
Tipo SASL Selecione Plain.
Nome secreto sincronizado Insira um nome do segredo do Kubernetes que contém a cadeia de conexão.
Referência de nome de usuário ou segredo de token A referência ao nome de usuário ou segredo de token usado para autenticação SASL. Selecione-o na lista Cofre da Chave ou crie um novo. O valor deve ser $ConnectionString.
Referência de senha do segredo do token A referência à senha ou segredo de token usado para autenticação SASL. Selecione-o na lista Cofre da Chave ou crie um novo. O valor deve estar no formato de Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>.

Depois de selecionar Adicionar referência, se você selecionar Criar novo, insira as seguintes configurações:

Definição Descrição
Nome do segredo O nome do segredo no Cofre da Chave do Azure. Escolha um nome que seja fácil de lembrar para selecionar o segredo mais tarde na lista.
Valor secreto Para o nome de usuário, digite $ConnectionString. Para a senha, digite a cadeia de conexão no formato Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>.
Definir data de ativação Se ativado, a data em que o segredo se torna ativo.
Definir data de validade Se ativado, a data em que o segredo expira.

Para saber mais sobre segredos, consulte Criar e gerenciar segredos nas Operações do Azure IoT.

Limitações

Os Hubs de Eventos do Azure não suportam todos os tipos de compressão suportados pelo Kafka. Atualmente, apenas a compactação GZIP é suportada nas camadas premium e dedicada dos Hubs de Eventos do Azure. O uso de outros tipos de compactação pode resultar em erros.

Corretores Kafka personalizados

Para configurar um ponto de extremidade de fluxo de dados para corretores Kafka que não sejam do Hub de Eventos, defina o host, TLS, autenticação e outras configurações conforme necessário.

  1. Na experiência de operações, selecione a guia Pontos de extremidade de fluxo de dados.

  2. Em Criar novo ponto de extremidade de fluxo de dados, selecione Novo Broker Kafka Personalizado>.

    Captura de tela usando a experiência de operações para criar um ponto de extremidade de fluxo de dados Kafka.

  3. Insira as seguintes configurações para o ponto de extremidade:

    Definição Description
    Name O nome do ponto de extremidade do fluxo de dados.
    Host O nome do host do corretor Kafka no formato <Kafka-broker-host>:xxxx. Inclua o número da porta na configuração do host.
    Método de autenticação O método usado para autenticação. Escolha SASL.
    Tipo SASL O tipo de autenticação SASL. Escolha Plain, ScramSha256 ou ScramSha512. Obrigatório se estiver usando SASL.
    Nome secreto sincronizado O nome do segredo. Obrigatório se estiver usando SASL.
    Referência de nome de usuário do segredo do token A referência ao nome de usuário no segredo do token SASL. Obrigatório se estiver usando SASL.
  4. Selecione Aplicar para provisionar o ponto de extremidade.

Nota

Atualmente, a experiência de operações não suporta o uso de um ponto de extremidade de fluxo de dados Kafka como fonte. Você pode criar um fluxo de dados com um ponto de extremidade de fluxo de dados Kafka de origem usando Kubernetes ou Bicep.

Para personalizar as configurações do ponto de extremidade, use as seções a seguir para obter mais informações.

Métodos de autenticação disponíveis

Os seguintes métodos de autenticação estão disponíveis para pontos de extremidade de fluxo de dados do broker Kafka.

Identidade gerida atribuída pelo sistema

Antes de configurar o ponto de extremidade de fluxo de dados, atribua uma função à identidade gerenciada do Azure IoT Operations que conceda permissão para se conectar ao broker Kafka:

  1. No portal do Azure, vá para sua instância de Operações do Azure IoT e selecione Visão geral.
  2. Copie o nome da extensão listada após a extensão Azure IoT Operations Arc. Por exemplo, azure-iot-operations-xxxx7.
  3. Vá para o recurso de nuvem que você precisa para conceder permissões. Por exemplo, vá para o namespace >Hubs de Eventos Controle de acesso (IAM)>Adicionar atribuição de função.
  4. Na guia Função, selecione uma função apropriada.
  5. Na guia Membros, para Atribuir acesso a, selecione a opção Usuário, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros e procure a identidade gerenciada das Operações IoT do Azure. Por exemplo, azure-iot-operations-xxxx7.

Em seguida, configure o ponto de extremidade de fluxo de dados com as configurações de identidade gerenciada atribuídas pelo sistema.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Básico e escolha Método de autenticação>Identidade gerenciada atribuída ao sistema.

Essa configuração cria uma identidade gerenciada com o público padrão, que é o mesmo que o valor do host do namespace dos Hubs de Eventos na forma de https://<NAMESPACE>.servicebus.windows.net. No entanto, se você precisar substituir o público padrão, poderá definir o audience campo para o valor desejado.

Não suportado na experiência de operações.

Identidade gerida atribuída pelo utilizador

Para usar a identidade gerenciada atribuída pelo usuário para autenticação, você deve primeiro implantar as Operações do Azure IoT com configurações seguras habilitadas. Em seguida, você precisa configurar uma identidade gerenciada atribuída pelo usuário para conexões na nuvem. Para saber mais, consulte Habilitar configurações seguras na implantação do Azure IoT Operations.

Antes de configurar o ponto de extremidade de fluxo de dados, atribua uma função à identidade gerenciada atribuída pelo usuário que conceda permissão para se conectar ao broker Kafka:

  1. No portal do Azure, vá para o recurso de nuvem que você precisa para conceder permissões. Por exemplo, vá para o controle de acesso ao namespace >da Grade de Eventos (IAM)>Adicionar atribuição de função.
  2. Na guia Função, selecione uma função apropriada.
  3. Na guia Membros, para Atribuir acesso a, selecione a opção Identidade gerenciada e, em seguida, selecione + Selecionar membros e procure sua identidade gerenciada atribuída pelo usuário.

Em seguida, configure o ponto de extremidade de fluxo de dados com as configurações de identidade gerenciada atribuídas pelo usuário.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Básico e escolha Método de autenticação>Identidade gerenciada atribuída pelo usuário.

Aqui, o escopo é o público da identidade gerenciada. O valor padrão é o mesmo que o valor do host do namespace dos Hubs de Eventos na forma de https://<NAMESPACE>.servicebus.windows.net. No entanto, se você precisar substituir o público padrão, poderá definir o campo de escopo para o valor desejado usando Bicep ou Kubernetes.

SASL

Para usar SASL para autenticação, especifique o método de autenticação SASL e configure o tipo SASL e uma referência secreta com o nome do segredo que contém o token SASL.

Na página Configurações do ponto de extremidade de fluxo de dados da experiência de operações, selecione a guia Básico e escolha Método de autenticação>SASL.

Insira as seguintes configurações para o ponto de extremidade:

Definição Descrição
Tipo SASL O tipo de autenticação SASL a ser usado. Os tipos suportados são Plain, ScramSha256e ScramSha512.
Nome secreto sincronizado O nome do segredo do Kubernetes que contém o token SASL.
Referência de nome de usuário ou segredo de token A referência ao nome de usuário ou segredo de token usado para autenticação SASL.
Referência de senha do segredo do token A referência à senha ou segredo de token usado para autenticação SASL.

Os tipos de SASL suportados são:

  • Plain
  • ScramSha256
  • ScramSha512

O segredo deve estar no mesmo namespace que o ponto de extremidade do fluxo de dados Kafka. O segredo deve ter o token SASL como um par chave-valor.

Anónimo

Para usar a autenticação anônima, atualize a seção de autenticação das configurações do Kafka para usar o método Anonymous.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Básico e escolha Método de autenticação>Nenhum.

Definições avançadas

Você pode definir configurações avançadas para o ponto de extremidade de fluxo de dados Kafka, como TLS, certificado de CA confiável, configurações de mensagens Kafka, processamento em lote e CloudEvents. Você pode definir essas configurações na guia Portal avançado do ponto de extremidade do fluxo de dados ou no recurso do ponto de extremidade do fluxo de dados.

Na experiência de operações, selecione a guia Avançado para o ponto de extremidade de fluxo de dados.

Captura de tela usando a experiência de operações para definir as configurações avançadas do ponto de extremidade de fluxo de dados Kafka.

Definições do TLS

Modo TLS

Para habilitar ou desabilitar o TLS para o ponto de extremidade Kafka, atualize a mode configuração nas configurações do TLS.

Na página Configurações do ponto de extremidade de fluxo de dados da experiência de operações, marque a guia Avançado e use a caixa de seleção ao lado de Modo TLS habilitado.

O modo TLS pode ser definido como Enabled ou Disabled. Se o modo estiver definido como Enabled, o fluxo de dados usará uma conexão segura com o broker Kafka. Se o modo estiver definido como Disabled, o fluxo de dados usará uma conexão insegura com o broker Kafka.

Certificado de autoridade de certificação confiável

Configure o certificado de CA confiável para o ponto de extremidade Kafka para estabelecer uma conexão segura com o broker Kafka. Essa configuração é importante se o broker Kafka usar um certificado autoassinado ou um certificado assinado por uma autoridade de certificação personalizada que não é confiável por padrão.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Mapa de configuração do certificado de autoridade de certificação confiável para especificar o ConfigMap que contém o certificado de autoridade de certificação confiável.

Este ConfigMap deve conter o certificado da autoridade de certificação no formato PEM. O ConfigMap deve estar no mesmo namespace que o recurso de fluxo de dados Kafka. Por exemplo:

kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations

Gorjeta

Ao conectar-se aos Hubs de Eventos do Azure, o certificado da autoridade de certificação não é necessário porque o serviço Hubs de Eventos usa um certificado assinado por uma autoridade de certificação pública confiável por padrão.

ID do grupo de consumidores

O ID do grupo de consumidores é usado para identificar o grupo de consumidores que o fluxo de dados usa para ler mensagens do tópico Kafka. O ID do grupo de consumidores deve ser exclusivo dentro do corretor Kafka.

Importante

Quando o ponto de extremidade Kafka é usado como origem, o ID do grupo de consumidores é necessário. Caso contrário, o fluxo de dados não pode ler mensagens do tópico Kafka e você recebe um erro "Os pontos de extremidade de origem do tipo Kafka devem ter um consumerGroupId definido".

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo ID do grupo de consumidores para especificar o ID do grupo de consumidores.

Essa configuração só terá efeito se o ponto de extremidade for usado como fonte (ou seja, o fluxo de dados for um consumidor).

Compressão

O campo de compressão permite a compressão para as mensagens enviadas para tópicos Kafka. A compressão ajuda a reduzir a largura de banda da rede e o espaço de armazenamento necessário para a transferência de dados. No entanto, a compactação também adiciona alguma sobrecarga e latência ao processo. Os tipos de compactação suportados estão listados na tabela a seguir.

valor Description
None Nenhuma compactação ou processamento em lote é aplicado. Nenhum é o valor padrão se nenhuma compactação for especificada.
Gzip A compactação GZIP e o processamento em lote são aplicados. GZIP é um algoritmo de compressão de uso geral que oferece um bom equilíbrio entre taxa de compressão e velocidade. Atualmente, apenas a compactação GZIP é suportada nas camadas premium e dedicada dos Hubs de Eventos do Azure.
Snappy A compactação rápida e o processamento em lote são aplicados. Snappy é um algoritmo de compressão rápida que oferece taxa de compressão moderada e velocidade. Este modo de compressão não é suportado pelos Hubs de Eventos do Azure.
Lz4 A compressão LZ4 e o processamento em lote são aplicados. LZ4 é um algoritmo de compressão rápida que oferece baixa taxa de compressão e alta velocidade. Este modo de compressão não é suportado pelos Hubs de Eventos do Azure.

Para configurar a compressão:

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Compactação para especificar o tipo de compactação.

Essa configuração só terá efeito se o ponto de extremidade for usado como um destino onde o fluxo de dados é um produtor.

Criação de batches

Além da compactação, você também pode configurar o processamento em lote para mensagens antes de enviá-las para tópicos Kafka. O processamento em lote permite agrupar várias mensagens e compactá-las como uma única unidade, o que pode melhorar a eficiência de compactação e reduzir a sobrecarga de rede.

Campo Descrição Obrigatório
mode Pode ser Enabled ou Disabled. O valor padrão é Enabled porque Kafka não tem uma noção de mensagens sem lote. Se definido como Disabled, o processamento em lote é minimizado para criar um lote com uma única mensagem de cada vez. Não
latencyMs O intervalo de tempo máximo, em milissegundos, que as mensagens podem ser armazenadas em buffer antes de serem enviadas. Se esse intervalo for atingido, todas as mensagens armazenadas em buffer serão enviadas como um lote, independentemente de quantas ou quão grandes elas sejam. Se não estiver definido, o valor padrão será 5. Não
maxMessages O número máximo de mensagens que podem ser armazenadas em buffer antes de serem enviadas. Se esse número for atingido, todas as mensagens armazenadas em buffer serão enviadas como um lote, independentemente de quão grande ou por quanto tempo elas estão armazenadas em buffer. Se não estiver definido, o valor padrão será 100000. Não
maxBytes O tamanho máximo em bytes que pode ser armazenado em buffer antes de ser enviado. Se esse tamanho for atingido, todas as mensagens armazenadas em buffer serão enviadas como um lote, independentemente de quantas ou por quanto tempo elas ficarão armazenadas em buffer. O valor padrão é 1000000 (1 MB). Não

Por exemplo, se você definir latencyMs como 1000, maxMessages como 100 e maxBytes como 1024, as mensagens serão enviadas quando houver 100 mensagens no buffer ou quando houver 1.024 bytes no buffer ou quando passarem 1.000 milissegundos desde o último envio, o que ocorrer primeiro.

Para configurar o processamento em lote:

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Lote habilitado para habilitar o envio em lote. Use os campos Latência em lote, Bytes máximos e Contagem de mensagens para especificar as configurações de lote.

Essa configuração só terá efeito se o ponto de extremidade for usado como um destino onde o fluxo de dados é um produtor.

Estratégia de manipulação de partições

A estratégia de manipulação de partições controla como as mensagens são atribuídas às partições Kafka ao enviá-las para tópicos Kafka. As partições Kafka são segmentos lógicos de um tópico Kafka que permitem processamento paralelo e tolerância a falhas. Cada mensagem em um tópico Kafka tem uma partição e um deslocamento, que são usados para identificar e ordenar as mensagens.

Essa configuração só terá efeito se o ponto de extremidade for usado como um destino onde o fluxo de dados é um produtor.

Por padrão, um fluxo de dados atribui mensagens a partições aleatórias, usando um algoritmo round-robin. No entanto, você pode usar diferentes estratégias para atribuir mensagens a partições com base em alguns critérios, como o nome do tópico MQTT ou uma propriedade de mensagem MQTT. Isso pode ajudá-lo a obter um melhor balanceamento de carga, localidade de dados ou ordenação de mensagens.

valor Description
Default Atribui mensagens a partições aleatórias, usando um algoritmo round-robin. Este é o valor padrão se nenhuma estratégia for especificada.
Static Atribui mensagens a um número de partição fixo derivado do ID da instância do fluxo de dados. Isso significa que cada instância de fluxo de dados envia mensagens para uma partição diferente. Isso pode ajudar a obter um melhor balanceamento de carga e localização de dados.
Topic Usa o nome do tópico MQTT da fonte de fluxo de dados como a chave para particionamento. Isso significa que mensagens com o mesmo nome de tópico MQTT são enviadas para a mesma partição. Isso pode ajudar a obter uma melhor ordenação de mensagens e localização de dados.
Property Usa uma propriedade de mensagem MQTT da fonte de fluxo de dados como a chave para particionamento. Especifique o nome da propriedade no partitionKeyProperty campo. Isso significa que mensagens com o mesmo valor de propriedade são enviadas para a mesma partição. Isso pode ajudar a obter uma melhor ordenação de mensagens e localidade de dados com base em um critério personalizado.

Por exemplo, se você definir a estratégia de manipulação de partições como Property e a propriedade da chave de partição como device-id, as mensagens com a mesma device-id propriedade serão enviadas para a mesma partição.

Para configurar a estratégia de manipulação de partições:

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Estratégia de manipulação de partições para especificar a estratégia de manipulação de partições. Use o campo Propriedade da chave de partição para especificar a propriedade usada para particionamento se a estratégia estiver definida como Property.

Agradecimentos Kafka

Os agradecimentos Kafka (acks) são usados para controlar a durabilidade e consistência das mensagens enviadas aos tópicos Kafka. Quando um produtor envia uma mensagem para um tópico Kafka, ele pode solicitar diferentes níveis de agradecimentos do corretor Kafka para garantir que a mensagem seja escrita com sucesso no tópico e replicada em todo o cluster Kafka.

Essa configuração só terá efeito se o ponto de extremidade for usado como destino (ou seja, o fluxo de dados for um produtor).

valor Description
None O fluxo de dados não espera por nenhum reconhecimento da corretora Kafka. Esta configuração é a opção mais rápida, mas menos durável.
All O fluxo de dados aguarda que a mensagem seja gravada na partição de líder e em todas as partições de seguidores. Esta configuração é a opção mais lenta, mas mais durável. Essa configuração também é a opção padrão
One O fluxo de dados aguarda que a mensagem seja gravada na partição líder e em pelo menos uma partição de seguidor.
Zero O fluxo de dados aguarda que a mensagem seja gravada na partição do líder, mas não espera por nenhum reconhecimento dos seguidores. Isto é mais rápido, One mas menos durável.

Por exemplo, se você definir a confirmação de Kafka como All, o fluxo de dados aguardará que a mensagem seja gravada na partição líder e em todas as partições de seguidores antes de enviar a próxima mensagem.

Para configurar os agradecimentos Kafka:

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo de confirmação de Kafka para especificar o nível de confirmação de Kafka.

Essa configuração só terá efeito se o ponto de extremidade for usado como um destino onde o fluxo de dados é um produtor.

Copiar propriedades MQTT

Por padrão, a configuração de propriedades MQTT de cópia está habilitada. Essas propriedades de usuário incluem valores como subject o que armazena o nome do ativo que envia a mensagem.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, marque a guia Avançado e use a caixa de seleção ao lado do campo Copiar propriedades MQTT para habilitar ou desabilitar a cópia das propriedades MQTT.

As seções a seguir descrevem como as propriedades MQTT são traduzidas para cabeçalhos de usuário Kafka e vice-versa quando a configuração está habilitada.

Kafka endpoint é um destino

Quando um ponto de extremidade Kafka é um destino de fluxo de dados, todas as propriedades definidas pela especificação MQTT v5 são traduzidas cabeçalhos de usuário Kafka. Por exemplo, uma mensagem MQTT v5 com "Content Type" sendo encaminhada para Kafka se traduz no cabeçalho "Content Type":{specifiedValue}de usuário Kafka. Regras semelhantes se aplicam a outras propriedades MQTT integradas, definidas na tabela a seguir.

Propriedade MQTT Comportamento traduzido
Indicador de Formato de Carga Útil Chave: "Indicador de Formato de Carga Útil"
Valor: "0" (Payload é bytes) ou "1" (Payload é UTF-8)
Tópico de resposta Chave: "Tópico de resposta"
Valor: Cópia do tópico de resposta da mensagem original.
Intervalo de expiração da mensagem Chave: "Intervalo de expiração da mensagem"
Valor: Representação UTF-8 do número de segundos antes da mensagem expirar. Consulte a propriedade Intervalo de expiração da mensagem para obter mais detalhes.
Dados de correlação: Chave: "Dados de correlação"
Valor: Cópia dos dados de correlação da mensagem original. Ao contrário de muitas propriedades MQTT v5 que são codificadas em UTF-8, os dados de correlação podem ser dados arbitrários.
Tipo de conteúdo: Chave: "Tipo de conteúdo"
Valor: cópia do tipo de conteúdo da mensagem original.

Os pares de valor de chave de propriedade do usuário MQTT v5 são traduzidos diretamente para cabeçalhos de usuário Kafka. Se um cabeçalho de usuário em uma mensagem tiver o mesmo nome que uma propriedade MQTT interna (por exemplo, um cabeçalho de usuário chamado "Dados de Correlação"), então se encaminhar o valor da propriedade de especificação MQTT v5 ou a propriedade de usuário é indefinido.

Os fluxos de dados nunca recebem essas propriedades de um MQTT Broker. Assim, um fluxo de dados nunca os encaminha:

  • Alias do tópico
  • Identificadores de Subscrição
A propriedade Intervalo de expiração da mensagem

O intervalo de expiração da mensagem especifica por quanto tempo uma mensagem pode permanecer em um broker MQTT antes de ser descartada.

Quando um fluxo de dados recebe uma mensagem MQTT com o intervalo de expiração de mensagem especificado, ele:

  • Registra a hora em que a mensagem foi recebida.
  • Antes de a mensagem ser emitida para o destino, o tempo é subtraído da mensagem que foi enfileirada do tempo do intervalo de expiração original.
  • Se a mensagem não tiver expirado (a operação acima é > 0), a mensagem será emitida para o destino e conterá o Tempo de Expiração da Mensagem atualizado.
  • Se a mensagem expirou (a operação acima é <= 0), então a mensagem não é emitida pelo destino.

Exemplos:

  • Um fluxo de dados recebe uma mensagem MQTT com Intervalo de Expiração de Mensagem = 3600 segundos. O destino correspondente é temporariamente desconectado, mas é capaz de se reconectar. 1.000 segundos passam antes que esta mensagem MQTT seja enviada para o destino. Nesse caso, a mensagem do destino tem seu intervalo de expiração de mensagem definido como 2600 (3600 - 1000) segundos.
  • O fluxo de dados recebe uma mensagem MQTT com intervalo de expiração de mensagem = 3600 segundos. O destino correspondente é temporariamente desconectado, mas é capaz de se reconectar. Neste caso, no entanto, leva 4.000 segundos para se reconectar. A mensagem expirou e o fluxo de dados não encaminha essa mensagem para o destino.

O ponto de extremidade Kafka é uma fonte de fluxo de dados

Nota

Há um problema conhecido ao usar o ponto de extremidade de Hubs de Eventos como uma fonte de fluxo de dados em que o cabeçalho Kafka é corrompido quando é traduzido para MQTT. Isso só acontece se estiver usando o Hub de Eventos através do cliente do Hub de Eventos que usa AMQP sob as cobertas. Por exemplo, "foo"="bar", o "foo" é traduzido, mas o valor torna-se "\xa1\x03bar".

Quando um ponto de extremidade Kafka é uma fonte de fluxo de dados, os cabeçalhos de usuário Kafka são convertidos para propriedades MQTT v5. A tabela a seguir descreve como os cabeçalhos de usuário Kafka são convertidos para propriedades MQTT v5.

Cabeçalho Kafka Comportamento traduzido
Chave Chave: "Chave"
Valor: Cópia da chave da mensagem original.
Carimbo de Data/Hora Chave: "Timestamp"
Valor: Codificação UTF-8 de Kafka Timestamp, que é o número de milissegundos desde a época Unix.

Os pares chave/valor do cabeçalho do usuário Kafka - desde que todos estejam codificados em UTF-8 - são traduzidos diretamente nas propriedades de chave/valor do usuário MQTT.

UTF-8 / Incompatibilidades binárias

O MQTT v5 só pode suportar propriedades baseadas em UTF-8. Se o fluxo de dados receber uma mensagem Kafka que contenha um ou mais cabeçalhos não UTF-8, o fluxo de dados irá:

  • Remova a propriedade ou propriedades ofensivas.
  • Encaminhe o restante da mensagem em, seguindo as regras anteriores.

Aplicativos que exigem transferência binária em cabeçalhos Kafka Source => propriedades MQTT Target devem primeiro codificá-los UTF-8 - por exemplo, via Base64.

>=64KB incompatibilidades de propriedade

As propriedades MQTT v5 devem ser menores que 64 KB. Se o fluxo de dados receber uma mensagem Kafka que contenha um ou mais cabeçalhos = >64KB, o fluxo de dados irá:

  • Remova a propriedade ou propriedades ofensivas.
  • Encaminhe o restante da mensagem em, seguindo as regras anteriores.
Tradução de propriedade ao usar Hubs de Eventos e produtores que usam AMQP

Se você tiver um cliente encaminhando mensagens, um ponto de extremidade de origem de fluxo de dados Kafka executando qualquer uma das seguintes ações:

  • Enviando mensagens para Hubs de Eventos usando bibliotecas de cliente como Azure.Messaging.EventHubs
  • Usando AMQP diretamente

Há nuances de tradução de propriedade a serem observadas.

Deve efetuar um dos seguintes procedimentos:

  • Evite enviar propriedades
  • Se você precisar enviar propriedades, envie valores codificados como UTF-8.

Quando os Hubs de Eventos traduzem propriedades de AMQP para Kafka, eles incluem os tipos codificados AMQP subjacentes em sua mensagem. Para obter mais informações sobre o comportamento, consulte Trocando eventos entre consumidores e produtores usando protocolos diferentes.

No exemplo de código a seguir, quando o ponto de extremidade de fluxo de dados recebe o valor "foo":"bar", ele recebe a propriedade como <0xA1 0x03 "bar">.

using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;

var propertyEventBody = new BinaryData("payload");

var propertyEventData = new EventData(propertyEventBody)
{
  Properties =
  {
    {"foo", "bar"},
  }
};

var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);

O ponto de extremidade do fluxo de dados não pode encaminhar a propriedade <0xA1 0x03 "bar"> payload para uma mensagem MQTT porque os dados não são UTF-8. No entanto, se você especificar uma cadeia de caracteres UTF-8, o ponto de extremidade do fluxo de dados traduzirá a cadeia de caracteres antes de enviar para o MQTT. Se você usar uma cadeia de caracteres UTF-8, a mensagem MQTT terá "foo":"bar" como propriedades de usuário.

Apenas cabeçalhos UTF-8 são traduzidos. Por exemplo, dado o seguinte cenário em que a propriedade é definida como um flutuador:

Properties = 
{
  {"float-value", 11.9 },
}

O ponto de extremidade de fluxo de dados descarta pacotes que contêm o "float-value" campo.

Nem todas as propriedades de dados de eventos, incluindo propertyEventData.correlationId, são encaminhadas. Para obter mais informações, consulte Propriedades do usuário do evento,

CloudEventos

CloudEvents são uma maneira de descrever dados de eventos de uma maneira comum. As configurações do CloudEvents são usadas para enviar ou receber mensagens no formato CloudEvents. Você pode usar o CloudEvents para arquiteturas orientadas a eventos em que diferentes serviços precisam se comunicar entre si no mesmo ou em diferentes provedores de nuvem.

As CloudEventAttributes opções são Propagate ouCreateOrRemap.

Na página Configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Atributos de evento da nuvem para especificar a configuração CloudEvents.

As seções a seguir descrevem como as propriedades do CloudEvent são propagadas ou criadas e remapped.

Propagar configuração

As propriedades do CloudEvent são passadas para mensagens que contêm as propriedades necessárias. Se a mensagem não contiver as propriedades necessárias, a mensagem será passada como está. Se as propriedades necessárias estiverem presentes, um ce_ prefixo será adicionado ao nome da propriedade CloudEvent.

Nome Obrigatório Valores de exemplo Nome da saída Valor de saída
specversion Sim 1.0 ce-specversion Passado como está
type Sim ms.aio.telemetry ce-type Passado como está
source Sim aio://mycluster/myoven ce-source Passado como está
id Sim A234-1234-1234 ce-id Passado como está
subject Não aio/myoven/telemetry/temperature ce-subject Passado como está
time Não 2018-04-05T17:31:00Z ce-time Passou como está. Não é recarimbado.
datacontenttype Não application/json ce-datacontenttype Alterado para o tipo de conteúdo de dados de saída após o estágio de transformação opcional.
dataschema Não sr://fabrikam-schemas/123123123234234234234234#1.0.0 ce-dataschema Se um esquema de transformação de dados de saída for fornecido na configuração de transformação, dataschema será alterado para o esquema de saída.

Configuração CreateOrRemap

As propriedades do CloudEvent são passadas para mensagens que contêm as propriedades necessárias. Se a mensagem não contiver as propriedades necessárias, as propriedades serão geradas.

Nome Obrigatório Nome da saída Valor gerado se faltar
specversion Sim ce-specversion 1.0
type Sim ce-type ms.aio-dataflow.telemetry
source Sim ce-source aio://<target-name>
id Sim ce-id UUID gerado no cliente de destino
subject Não ce-subject O tópico de saída para onde a mensagem é enviada
time Não ce-time Gerado como RFC 3339 no cliente de destino
datacontenttype Não ce-datacontenttype Alterado para o tipo de conteúdo de dados de saída após o estágio de transformação opcional
dataschema Não ce-dataschema Esquema definido no registro do esquema

Próximos passos

Para saber mais sobre fluxos de dados, consulte Criar um fluxo de dados.