Partilhar via


Agregar dados em um pipeline do processador de dados

Importante

Azure IoT Operations Preview – habilitado pelo Azure Arc está atualmente em visualização. Não deve utilizar este software de pré-visualização em ambientes de produção.

Você precisará implantar uma nova instalação do Azure IoT Operations quando uma versão disponível em geral for disponibilizada, você não poderá atualizar uma instalação de visualização.

Veja Termos de Utilização Complementares da Pré-visualizações do Microsoft Azure para obter os termos legais que se aplicam às funcionalidades do Azure que estão na versão beta, na pré-visualização ou que ainda não foram lançadas para disponibilidade geral.

O estágio agregado é um estágio de pipeline intermediário, configurável e opcional que permite executar operações de redução de amostragem e processamento em lote de dados do sensor de streaming em janelas de tempo definidas pelo usuário.

Use um estágio de agregação para acumular mensagens em uma janela definida e calcular valores de agregação de propriedades nas mensagens. O estágio emite os valores agregados como propriedades em uma única mensagem no final de cada janela de tempo.

  • Cada partição de pipeline realiza agregação independentemente uma da outra.
  • A saída do estágio é uma única mensagem que contém todas as propriedades agregadas definidas.
  • O estágio deixa cair todas as outras propriedades. No entanto, você pode usar as funções Last, First ou Collect para preservar propriedades que, de outra forma, seriam descartadas pelo estágio durante a agregação.
  • Para que o estágio de agregação funcione, o estágio da fonte de dados no pipeline deve desserializar a mensagem de entrada.

Pré-requisitos

Para configurar e usar um estágio de pipeline agregado, você precisa de uma instância implantada do processador de dados que inclua o componente opcional do processador de dados.

Configurar o palco

A configuração JSON do estágio agregado define os detalhes do estágio. Para criar o palco, você pode interagir com a interface do usuário baseada em formulário ou fornecer a configuração JSON na guia Avançado :

Campo Tipo Descrição Necessário Predefinição Exemplo
Nome Cadeia (de carateres) Um nome a ser exibido na interface do usuário do processador de dados. Sim - Calculate Aggregate
Description String Uma descrição de fácil utilização do que faz a fase agregada. Não Aggregation over temperature
Janela de tempo Duração que especifica o período durante o qual a agregação é executada. Sim - 10s
Função Propriedades > Enumeração A função agregada a ser usada. Sim - Sum
Propriedades > InputPath1 Caminho O caminho para a propriedade na mensagem de entrada para aplicar a função. Sim - .payload.temperature
Propriedades > OutputPath2 Caminho O caminho para o local na mensagem de saída para colocar o resultado. Sim - .payload.temperature.average

Você pode definir várias configurações de Propriedades em um estágio agregado. Por exemplo, calcule a soma da temperatura e calcule a média da pressão.

1 Caminho de entrada:

  • O tipo de dados do valor da propriedade de caminho de entrada deve ser compatível com o tipo de função definida.
  • Você pode fornecer o mesmo caminho de entrada em várias configurações de agregação para calcular várias funções sobre a mesma propriedade de caminho de entrada. Certifique-se de que os caminhos de saída são diferentes para evitar a substituição dos resultados.

2 Caminho de saída:

  • Os caminhos de saída podem ser iguais ou diferentes do caminho de entrada. Use caminhos de saída diferentes se estiver calculando várias agregações na mesma propriedade de caminho de entrada.
  • Configure caminhos de saída distintos para evitar a substituição de valores agregados.

Windows

A janela é o intervalo de tempo durante o qual o palco acumula mensagens. No final da janela, o palco aplica a função configurada às propriedades da mensagem. Em seguida, o palco emite uma única mensagem.

Atualmente, o palco suporta apenas janelas de tombamento .

As janelas de tombamento são uma série de intervalos de tempo de tamanho fixo, não sobrepostos e consecutivos. A janela começa e termina em pontos fixos no tempo:

Diagrama que mostra janelas de tombamento de 10 segundos no estágio agregado.

O tamanho da janela define o intervalo de tempo durante o qual o palco acumula as mensagens. Você define o tamanho da janela usando o padrão comum Duração .

Funções

O estágio de agregação suporta as seguintes funções para calcular valores agregados sobre a propriedade message definida no caminho de entrada:

Function Description
Soma Calcula a soma dos valores da propriedade nas mensagens de entrada.
Média Calcula a média dos valores da propriedade nas mensagens de entrada.
Count Conta o número de vezes que a propriedade aparece na janela.
Min Calcula o valor mínimo dos valores da propriedade nas mensagens de entrada.
Máx Calcula o valor máximo dos valores da propriedade nas mensagens de entrada.
Last Retorna o valor mais recente dos valores da propriedade nas mensagens de entrada.
First Retorna o primeiro valor dos valores da propriedade nas mensagens de entrada.
Collect Retorne todos os valores da propriedade nas mensagens de entrada.

A tabela a seguir lista os tipos de dados de mensagem suportados por cada função:

Function Número inteiro Float String Datetime Matriz Object Binário
Sum
Média
Contagem
Mín.
Max
Last
First
Collect

Configuração de exemplo

O exemplo JSON a seguir mostra uma configuração completa de estágio agregado:

{ 
    "displayName":"downSample", 
    "description":"Calculate average for production tags", 
    "window": 
    { 
        "type":"tumbling", 
        "size":"10s" 
    }, 
    "properties": 
    [ 
        { 
            "function":"average", 
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_avg" 
        }, 
        {  
            "function":"collect",  
            "inputPath": ".payload.temperature", 
            "outputPath":".payload.temperature_all"  
        },  
        {  
            "function":"average",  
            "inputPath":".payload.pressure", 
            "outputPath":".payload.pressure"                  
        },  
        {  
            "function":"last",  
            "inputPath":".systemProperties", 
            "outputPath": ".systemProperties" 
        } 
    ] 
}

A configuração define um estágio agregado que calcula, ao longo de uma janela de dez segundos:

  • Temperatura média
  • Soma da temperatura
  • Soma da pressão

Exemplo

Este exemplo inclui duas mensagens de entrada de exemplo e uma mensagem de saída de exemplo gerada usando a configuração anterior:

Mensagem de entrada 1:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 10, 
        "temperature":250, 
        "pressure":30, 
        "runningState": true 
    } 
} 

Mensagem de entrada 2:

{ 
    "systemProperties":{ 
        "partitionKey":"foo", 
        "partitionId":5, 
        "timestamp":"2023-01-11T10:02:07Z" 
    }, 
    "qos":1, 
    "topic":"/assets/foo/tags/bar", 
    "properties":{ 
        "responseTopic":"outputs/foo/tags/bar", 
        "contentType": "application/json" 
    }, 
    "payload":{ 
        "humidity": 11, 
        "temperature":235, 
        "pressure":25, 
        "runningState": true 
    } 
} 

Mensagem de saída:

{ 
    "systemProperties":{  
        "partitionKey":"foo",  
        "partitionId":5,  
        "timestamp":"2023-01-11T10:02:07Z"  
    }, 
    "payload":{ 
        "temperature_avg":242.5, 
        "temperature_all":[250,235], 
        "pressure":27.5 
    } 
}