Configurar um estágio de origem MQ em um pipeline do processador de dados
Importante
Azure IoT Operations Preview – habilitado pelo Azure Arc está atualmente em visualização. Não deve utilizar este software de pré-visualização em ambientes de produção.
Você precisará implantar uma nova instalação do Azure IoT Operations quando uma versão disponível em geral for disponibilizada, você não poderá atualizar uma instalação de visualização.
Veja Termos de Utilização Complementares da Pré-visualizações do Microsoft Azure para obter os termos legais que se aplicam às funcionalidades do Azure que estão na versão beta, na pré-visualização ou que ainda não foram lançadas para disponibilidade geral.
O estágio de origem é o primeiro e necessário estágio em um pipeline de processador de dados. O estágio de origem recebe dados no pipeline de processamento de dados e os prepara para processamento posterior. O estágio de origem MQ permite que você assine mensagens de um tópico MQTT. No estágio de origem, você define detalhes de conexão com a fonte MQ e estabelece uma configuração de particionamento com base em seus requisitos específicos de processamento de dados.
Pré-requisitos
- Uma instância implantada do processador de dados que inclui o componente opcional do processador de dados.
- Uma instância do broker MQTT com todos os dados brutos necessários disponíveis é operacional e acessível.
Configurar o código-fonte MQ
Para configurar a origem MQ:
- Forneça detalhes de conexão para a fonte MQ. Essa configuração inclui o tipo da fonte MQ, a URL do broker MQTT, o nível de Qualidade de Serviço (QoS), o tipo de sessão e os tópicos a serem assinados.
- Especifique o método de autenticação. Atualmente limitado a autenticação baseada em nome de usuário/senha ou token de conta de serviço.
A tabela a seguir descreve os parâmetros de configuração de origem MQ:
Campo | Descrição | Necessário | Predefinição | Exemplo |
---|---|---|---|---|
Nome | Um nome visível para o cliente para o estágio de origem. | Necessário | ND | asset-1broker |
Description | Uma descrição visível do estágio de origem visível pelo cliente. | Opcional | ND | brokerforasset-1 |
Mediador | A URL do broker MQTT ao qual se conectar. | Necessário | ND | tls://aio-mq-dmqtt-frontend:8883 |
Autenticação | O método de autenticação para se conectar ao broker. Um dos: None , Username/Password , e Service Account Token (SAT) . |
Necessário | Service Account Token (SAT) |
Service Account Token (SAT) |
Nome de utilizador/Palavra-passe > Nome de utilizador | O nome de usuário para a autenticação de nome de usuário/senha | Sim | ND | myuser |
Segredo do nome de utilizador/palavra-passe > | Referência à palavra-passe armazenada no Cofre de Chaves do Azure. | Sim | ND | AKV_USERNAME_PASSWORD |
QoS | Nível de QoS para entrega de mensagens. | Necessário | 1 | 0 |
Sessão limpa | Defina como FALSE para uma sessão persistente. |
Necessário | FALSE |
FALSE |
Tópico | O tópico a subscrever para aquisição de dados. | Necessário | ND | contoso/site1/asset1 , contoso/site1/asset2 |
Para saber mais sobre segredos, consulte Gerenciar segredos para sua implantação do Azure IoT Operations Preview.
O processador de dados não reordena dados fora de ordem provenientes do broker MQTT. Se os dados forem recebidos fora de ordem do corretor, eles permanecerão assim no pipeline.
Selecionar formato de dados
Em um pipeline do processador de dados, o campo de formato no estágio de origem especifica como desserializar os dados de entrada. Por padrão, o pipeline do processador de dados usa o raw
formato que significa que ele não converte os dados de entrada. Para usar muitos recursos do processador de dados, como Filter
estágios ou Enrich
estágios em um pipeline, você deve desserializar seus dados no estágio de entrada. Você pode optar por desserializar seus dados de entrada de , , , CSV
, ou Protobuf
formatos em uma mensagem legível do processador de dados para usar a funcionalidade completa do processador de JSON
dados. CBOR
MessagePack
jsonStream
As tabelas a seguir descrevem as diferentes opções de configuração de desserialização:
Campo | Descrição | Necessário | Predefinição | Value |
---|---|---|---|---|
Formato de Dados | O tipo do formato de dados. | Sim | Raw |
Raw JSON jsonStream MessagePack CBOR CSV Protobuf |
O Data Format
campo é obrigatório e o seu valor determina os outros campos obrigatórios.
Para desserializar mensagens CSV, você também precisa especificar os seguintes campos:
Campo | Descrição | Necessário | valor | Exemplo |
---|---|---|---|---|
Cabeçalho | Se os dados CSV incluem uma linha de cabeçalho. | Sim | Yes No |
No |
Nome | Nome da coluna em CSV | Sim | - | temp , asset |
Caminho | O caminho jq na mensagem onde as informações da coluna são adicionadas. | Não | - | O caminho jq padrão é o nome da coluna |
Tipo de Dados | O tipo de dados dos dados na coluna e como eles são representados dentro do pipeline do processador de dados. | Não | String , Float , Integer , Boolean , Bytes |
Predefinição: String |
Para desserializar mensagens Protobuf, você também precisa especificar os seguintes campos:
Campo | Descrição | Necessário | valor | Exemplo |
---|---|---|---|---|
Descritor | O descritor codificado em base64 para a definição de protobuf. | Sim | - | Zhf... |
Mensagem | O nome do tipo de mensagem usado para formatar os dados. | Sim | - | pipeline |
Pacote | O nome do pacote no descritor onde o tipo é definido. | Sim | - | schedulerv1 |
Nota
O processador de dados suporta apenas um tipo de mensagem em cada arquivo .proto.
Configurar particionamento
O particionamento em um pipeline divide os dados de entrada em partições separadas. O particionamento permite o paralelismo de dados no pipeline, o que pode melhorar a taxa de transferência e reduzir a latência. As estratégias de particionamento afetam a forma como os dados são processados nos outros estágios do pipeline. Por exemplo, o último estágio de valor conhecido e o estágio agregado operam em cada partição lógica.
Para particionar seus dados, especifique uma estratégia de particionamento e o número de partições a serem usadas:
Campo | Descrição | Necessário | Predefinição | Exemplo |
---|---|---|---|---|
Tipo de partição | O tipo de particionamento a ser usado: Partição ID ou Partição Key |
Necessário | Key |
Key |
Expressão de partição | A expressão jq a ser usada na mensagem de entrada para calcular a partição ID ou partição Key |
Necessário | .topic |
.topic |
Número de partições | O número de partições em um pipeline do processador de dados. | Necessário | 2 |
2 |
O processador de dados adiciona metadados adicionais à mensagem de entrada. Consulte Visão geral da estrutura de mensagens do processador de dados para entender como especificar corretamente a expressão de particionamento que é executada na mensagem de entrada. Por padrão, a expressão de particionamento é definida como 0
com o tipo Partition para ID
enviar todos os dados de entrada para uma única partição.
Para obter recomendações e saber mais, consulte O que é particionamento?.
Configuração de exemplo
A seguir mostra um exemplo de configuração para o estágio:
Parâmetro | Valor |
---|---|
Nome | input data |
Mediador | tls://aio-mq-dmqtt-frontend:8883 |
Autenticação | Service Account Token (SAT) |
Tópico | azure-iot-operations/data/opc-ua-connector-0/# |
Formato dos dados | JSON |
Em seguida, essa configuração gera mensagens parecidas com o exemplo a seguir:
{
"Timestamp": "2023-08-10T00:54:58.6572007Z",
"MessageType": "ua-deltaframe",
"payload": {
"temperature": {
"SourceTimestamp": "2023-08-10T00:54:58.2543129Z",
"Value": 7109
},
"Tag 10": {
"SourceTimestamp": "2023-08-10T00:54:58.2543482Z",
"Value": 7109
}
},
"DataSetWriterName": "oven",
"SequenceNumber": 4660
}