Ingerir dados de amostra formatados em JSON no Azure Data Explorer
Artigo
Este artigo mostra como ingerir dados JSON formatados em um banco de dados do Azure Data Explorer. Você começará com exemplos simples de JSON bruto e mapeado, continuará para JSON com várias linhas e, em seguida, lidará com esquemas JSON mais complexos que contêm matrizes e dicionários. Os exemplos detalham o processo de ingestão de dados formatados em JSON usando KQL (Kusto Query Language), C# ou Python.
Azure Data Explorer dá suporte a dois formatos de arquivo JSON:
json: JSON separado por linha. Cada linha nos dados de entrada tem exatamente um registro JSON. Esse formato dá suporte à análise de comentários e propriedades entre aspas simples. Para obter mais informações, consulte Linhas do JSON.
multijson: JSON de várias linhas. O analisador ignora os separadores de linha e lê um registro da posição anterior até o final de um JSON válido.
Observação
Ao ingerir usando a experiência de obtenção de dados, o formato padrão é multijson. O formato pode lidar com registros JSON de várias linhas e matrizes de registros JSON. Quando um erro de análise é encontrado, todo o arquivo é descartado. Para ignorar registros JSON inválidos, selecione a opção "Ignorar erros de formato de dados.", que mudará o formato para json (Linhas JSON).
Se você estiver usando o formato de linha JSON (json), as linhas que não representam registros JSON válidos serão ignoradas durante a análise.
Ingerir e mapear dados formatados em JSON
A ingestão de dados formatados em JSON exige que você especifique o formato usando a propriedade de ingestão. A ingestão de dados JSON requer mapeamento, que mapeia uma entrada de origem JSON para sua coluna de destino. Ao ingerir dados, use a propriedade IngestionMapping com a propriedade de ingestão ingestionMappingReference (para um mapeamento predefinido) ou a propriedade IngestionMappings. Este artigo usará a propriedade de ingestão ingestionMappingReference, que é predefinida na tabela usada para ingestão. Nos exemplos abaixo, vamos começar ingerindo registros JSON como dados brutos em uma única tabela de colunas. Em seguida, vamos usar o mapeamento para ingerir cada propriedade para sua coluna mapeada.
Exemplo de JSON simples
O exemplo a seguir é um JSON simples, com uma estrutura plana. Os dados têm informações de temperatura e umidade, coletadas por vários dispositivos. Cada registro é marcado com uma ID e um carimbo de data/hora.
Neste exemplo, você ingere registros JSON como dados brutos em uma única tabela de colunas. A manipulação de dados, usando consultas e a política de atualização é feita depois que os dados são ingeridos.
Esse comando cria um mapeamento e mapeia o caminho raiz JSON $ para a coluna Event.
Faça a ingestão de dados na tabela RawEvents.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos usar essa tabela para todos os exemplos e comandos de ingestão a seguir.
Nesse mapeamento, conforme definido pelo esquema de tabela, as entradas timestampserão ingeridas na coluna Time como tipos de dados datetime.
Faça a ingestão de dados na tabela Events.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
O arquivo 'simple.json' tem alguns registros JSON separados por linha. O formato é json e o mapeamento usado no comando de ingestão é o FlatEventMapping que você criou.
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos usar essa tabela para todos os exemplos e comandos de ingestão a seguir.
Nesse mapeamento, conforme definido pelo esquema de tabela, as entradas timestampserão ingeridas na coluna Time como tipos de dados datetime.
Faça a ingestão de dados na tabela Events.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
O arquivo 'simple.json' tem alguns registros JSON separados por linha. O formato é json e o mapeamento usado no comando de ingestão é o FlatEventMapping que você criou.
Crie uma nova tabela, com um esquema semelhante aos dados de entrada JSON. Vamos usar essa tabela para todos os exemplos e comandos de ingestão a seguir.
O arquivo 'simple.json' tem alguns registros JSON separados por linha. O formato é json e o mapeamento usado no comando de ingestão é o FlatEventMapping que você criou.
Ingerir registros JSON de várias linhas
Neste exemplo, você ingere registros JSON de várias linhas. Cada propriedade JSON é mapeada para uma única coluna na tabela. O arquivo 'multilined.json' tem alguns registros JSON recuados. O formato multijson indica a leitura de registros pela estrutura JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Faça a ingestão de dados na tabela Events.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Tipos de dados de matriz são uma coleção ordenada de valores. A ingestão de uma matriz JSON é feita por uma política de atualização. O JSON é ingerido no estado em que se está em uma tabela intermediária. Uma política de atualização executa uma função predefinida na tabela RawEvents, testando novamente os resultados para a tabela de destino. Vamos ingerir dados com a seguinte estrutura:
Crie uma função update policy que expanda a coleção de records para que cada valor na coleção receba uma linha separada, usando o operador mv-expand. Vamos usar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
O esquema recebido pela função deve corresponder ao esquema da tabela de destino. Use o operador getschema para examinar o esquema.
EventRecordsExpand() | getschema
Adicione a política de atualização à tabela de destino. Essa política executará a consulta automaticamente nos dados recém-ingeridos na tabela de dados intermediária RawEvents e ingerirá os resultados na tabela Events. Defina uma política de retenção zero para evitar a persistência da tabela intermediária.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Examine os dados na tabela Events.
Events
Crie uma função de atualização que expanda a coleção de records para que cada valor na coleção receba uma linha separada, usando o operador mv-expand. Vamos usar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Observação
O esquema recebido pela função deve corresponder ao esquema da tabela de destino.
Adicione a política de atualização à tabela de destino. Essa política executará a consulta automaticamente nos dados recém-ingeridos na tabela de dados intermediária RawEvents e ingerirá os resultados na tabela Events. Defina uma política de retenção zero para evitar a persistência da tabela intermediária.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Examine os dados na tabela Events.
Crie uma função de atualização que expanda a coleção de records para que cada valor na coleção receba uma linha separada, usando o operador mv-expand. Vamos usar a tabela RawEvents como uma tabela de origem e Events como uma tabela de destino.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Observação
O esquema recebido pela função precisa corresponder ao esquema da tabela de destino.
Adicione a política de atualização à tabela de destino. Essa política executará a consulta automaticamente nos dados recém-ingeridos na tabela de dados intermediária RawEvents e ingerirá os resultados na tabela Events. Defina uma política de retenção zero para evitar a persistência da tabela intermediária.