Agregar dados em um pipeline do processador de dados
Importante
Azure IoT Operations Preview – habilitado pelo Azure Arc está atualmente em visualização. Não deve utilizar este software de pré-visualização em ambientes de produção.
Você precisará implantar uma nova instalação do Azure IoT Operations quando uma versão disponível em geral for disponibilizada, você não poderá atualizar uma instalação de visualização.
Veja Termos de Utilização Complementares da Pré-visualizações do Microsoft Azure para obter os termos legais que se aplicam às funcionalidades do Azure que estão na versão beta, na pré-visualização ou que ainda não foram lançadas para disponibilidade geral.
O estágio agregado é um estágio de pipeline intermediário, configurável e opcional que permite executar operações de redução de amostragem e processamento em lote de dados do sensor de streaming em janelas de tempo definidas pelo usuário.
Use um estágio de agregação para acumular mensagens em uma janela definida e calcular valores de agregação de propriedades nas mensagens. O estágio emite os valores agregados como propriedades em uma única mensagem no final de cada janela de tempo.
- Cada partição de pipeline realiza agregação independentemente uma da outra.
- A saída do estágio é uma única mensagem que contém todas as propriedades agregadas definidas.
- O estágio deixa cair todas as outras propriedades. No entanto, você pode usar as funções Last, First ou Collect para preservar propriedades que, de outra forma, seriam descartadas pelo estágio durante a agregação.
- Para que o estágio de agregação funcione, o estágio da fonte de dados no pipeline deve desserializar a mensagem de entrada.
Pré-requisitos
Para configurar e usar um estágio de pipeline agregado, você precisa de uma instância implantada do processador de dados que inclua o componente opcional do processador de dados.
Configurar o palco
A configuração JSON do estágio agregado define os detalhes do estágio. Para criar o palco, você pode interagir com a interface do usuário baseada em formulário ou fornecer a configuração JSON na guia Avançado :
Campo | Tipo | Descrição | Necessário | Predefinição | Exemplo |
---|---|---|---|---|---|
Nome | Cadeia (de carateres) | Um nome a ser exibido na interface do usuário do processador de dados. | Sim | - | Calculate Aggregate |
Description | String | Uma descrição de fácil utilização do que faz a fase agregada. | Não | Aggregation over temperature |
|
Janela de tempo | Duração que especifica o período durante o qual a agregação é executada. | Sim | - | 10s |
|
Função Propriedades > | Enumeração | A função agregada a ser usada. | Sim | - | Sum |
Propriedades > InputPath1 | Caminho | O caminho para a propriedade na mensagem de entrada para aplicar a função. | Sim | - | .payload.temperature |
Propriedades > OutputPath2 | Caminho | O caminho para o local na mensagem de saída para colocar o resultado. | Sim | - | .payload.temperature.average |
Você pode definir várias configurações de Propriedades em um estágio agregado. Por exemplo, calcule a soma da temperatura e calcule a média da pressão.
1 Caminho de entrada:
- O tipo de dados do valor da propriedade de caminho de entrada deve ser compatível com o tipo de função definida.
- Você pode fornecer o mesmo caminho de entrada em várias configurações de agregação para calcular várias funções sobre a mesma propriedade de caminho de entrada. Certifique-se de que os caminhos de saída são diferentes para evitar a substituição dos resultados.
2 Caminho de saída:
- Os caminhos de saída podem ser iguais ou diferentes do caminho de entrada. Use caminhos de saída diferentes se estiver calculando várias agregações na mesma propriedade de caminho de entrada.
- Configure caminhos de saída distintos para evitar a substituição de valores agregados.
Windows
A janela é o intervalo de tempo durante o qual o palco acumula mensagens. No final da janela, o palco aplica a função configurada às propriedades da mensagem. Em seguida, o palco emite uma única mensagem.
Atualmente, o palco suporta apenas janelas de tombamento .
As janelas de tombamento são uma série de intervalos de tempo de tamanho fixo, não sobrepostos e consecutivos. A janela começa e termina em pontos fixos no tempo:
O tamanho da janela define o intervalo de tempo durante o qual o palco acumula as mensagens. Você define o tamanho da janela usando o padrão comum Duração .
Funções
O estágio de agregação suporta as seguintes funções para calcular valores agregados sobre a propriedade message definida no caminho de entrada:
Function | Description |
---|---|
Soma | Calcula a soma dos valores da propriedade nas mensagens de entrada. |
Média | Calcula a média dos valores da propriedade nas mensagens de entrada. |
Count | Conta o número de vezes que a propriedade aparece na janela. |
Min | Calcula o valor mínimo dos valores da propriedade nas mensagens de entrada. |
Máx | Calcula o valor máximo dos valores da propriedade nas mensagens de entrada. |
Last | Retorna o valor mais recente dos valores da propriedade nas mensagens de entrada. |
First | Retorna o primeiro valor dos valores da propriedade nas mensagens de entrada. |
Collect | Retorne todos os valores da propriedade nas mensagens de entrada. |
A tabela a seguir lista os tipos de dados de mensagem suportados por cada função:
Function | Número inteiro | Float | String | Datetime | Matriz | Object | Binário |
---|---|---|---|---|---|---|---|
Sum | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Média | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ |
Contagem | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Mín. | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Max | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ | ❌ |
Last | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
First | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Collect | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Configuração de exemplo
O exemplo JSON a seguir mostra uma configuração completa de estágio agregado:
{
"displayName":"downSample",
"description":"Calculate average for production tags",
"window":
{
"type":"tumbling",
"size":"10s"
},
"properties":
[
{
"function":"average",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_avg"
},
{
"function":"collect",
"inputPath": ".payload.temperature",
"outputPath":".payload.temperature_all"
},
{
"function":"average",
"inputPath":".payload.pressure",
"outputPath":".payload.pressure"
},
{
"function":"last",
"inputPath":".systemProperties",
"outputPath": ".systemProperties"
}
]
}
A configuração define um estágio agregado que calcula, ao longo de uma janela de dez segundos:
- Temperatura média
- Soma da temperatura
- Soma da pressão
Exemplo
Este exemplo inclui duas mensagens de entrada de exemplo e uma mensagem de saída de exemplo gerada usando a configuração anterior:
Mensagem de entrada 1:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 10,
"temperature":250,
"pressure":30,
"runningState": true
}
}
Mensagem de entrada 2:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"qos":1,
"topic":"/assets/foo/tags/bar",
"properties":{
"responseTopic":"outputs/foo/tags/bar",
"contentType": "application/json"
},
"payload":{
"humidity": 11,
"temperature":235,
"pressure":25,
"runningState": true
}
}
Mensagem de saída:
{
"systemProperties":{
"partitionKey":"foo",
"partitionId":5,
"timestamp":"2023-01-11T10:02:07Z"
},
"payload":{
"temperature_avg":242.5,
"temperature_all":[250,235],
"pressure":27.5
}
}