Compartilhar via


Criar uma aplicação para obter dados usando a ingestão em fila de espera

Aplica-se a: ✅Microsoft FabricAzure Data Explorer

Kusto é capaz de lidar com a ingestão de dados em massa, otimizando e agrupando os dados ingeridos por meio de seu gerenciador de lotes. O gestor de lotes agrega os dados ingeridos antes de atingirem a tabela de destino, permitindo um processamento mais eficiente e um melhor desempenho. O processamento em lote geralmente é feito em volumes de 1 GB de dados brutos, mil ficheiros individuais ou, por padrão, um tempo de espera de 5 minutos. As políticas de processamento em lote podem ser atualizadas nos níveis de banco de dados e tabela, geralmente para diminuir o tempo de processamento em lote e reduzir a latência. Para obter mais informações sobre o processamento em lote de ingestão, consulte política IngestionBatching e Alterar a política de lote de ingestão de nível de tabela programaticamente.

Observação

O processamento em lote também leva em conta vários fatores, como o banco de dados e a tabela de destino, o usuário que executa a ingestão e várias propriedades associadas à ingestão, como tags especiais.

Neste artigo, você aprenderá a:

Pré-requisitos

Antes de começar

  • Use um dos seguintes métodos para criar a tabela MyStormEvents e, como apenas uma pequena quantidade de dados está sendo ingerida, defina o tempo limite da política de lote de ingestão para 10 segundos:

    1. Crie uma tabela de destino chamada MyStormEvents no seu banco de dados executando a primeira aplicação em comandos de gerenciamento.
    2. Defina o tempo limite da política de lote de ingestão para 10 segundos, executando a segunda aplicação nos comandos de gestão . Antes de executar o aplicativo, altere o valor de tempo limite para 00:00:10.

    Observação

    Pode levar alguns minutos para que as novas configurações de política de lote se propaguem para o gerenciador de lotes.

  • Baixe o arquivo de dados de exemplo stormevent.csv. O arquivo contém 1.000 registros de eventos de tempestade.

Observação

Os exemplos a seguir assumem uma correspondência trivial entre as colunas dos dados ingeridos e o esquema da tabela de destino. Se os dados ingeridos não corresponderem trivialmente ao esquema da tabela, você deverá usar um mapeamento de ingestão para alinhar as colunas dos dados com o esquema da tabela.

Enfileirar um ficheiro para ingestão e consultar os resultados

Em seu IDE ou editor de texto preferido, crie um projeto ou arquivo chamado ingestão básica usando a convenção apropriada para seu idioma preferido. Coloque o arquivo stormevent.csv no mesmo local do seu aplicativo.

Observação

Nos exemplos a seguir, você usa dois clientes, um para consultar seu cluster e outro para ingerir dados em seu cluster. Para idiomas em que a biblioteca do cliente oferece suporte a ele, ambos os clientes compartilham o mesmo autenticador de prompt do usuário, resultando em um único prompt de usuário em vez de um para cada cliente.

Adicione o seguinte código:

  1. Crie uma aplicação cliente que se ligue ao seu cluster e imprima o número de linhas na tabela MyStormEvents. Você usará essa contagem como uma linha de base para comparação com o número de linhas após cada método de ingestão. Substitua os marcadores de posição <your_cluster_uri> e <your_database> pelo URI do cluster e pelo nome do banco de dados, respectivamente.

    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Crie um objeto construtor de cadeia de conexão que defina o URI de ingestão de dados, sempre que possível, usando o compartilhamento das mesmas credenciais de autenticação que o URI do cluster. Substitua o espaço reservado <your_ingestion_uri> pelo URI de ingestão de dados.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Ingerir o arquivo stormevent.csv adicionando-o à fila de lotes. Você usa os seguintes objetos e propriedades:

    • QueuedIngestClient para criar o cliente de ingestão.
    • IngestionProperties para definir as propriedades de ingestão.
    • DataFormat especificar o formato de arquivo como CSV.
    • ignore_first_record para especificar se a primeira linha de um arquivo CSV e de tipos de ficheiros semelhantes é ignorada, usando a seguinte lógica:
      • True: A primeira linha é ignorada. Use esta opção para remover a linha de cabeçalho dos dados tabulares textuais.
      • Falso: A primeira linha é ingerida como uma linha regular.

    Observação

    A ingestão suporta um tamanho máximo de ficheiro de 6 GB. A recomendação é ingerir arquivos entre 100 MB e 1 GB.

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Consulte o número de linhas na tabela após a ingestão do arquivo e mostre a última linha ingerida.

    Observação

    Para dar tempo para que a ingestão seja concluída, aguarde 30 segundos antes de consultar a tabela. Para C#, aguarde 60 segundos para dar tempo de adicionar o arquivo à fila de ingestão de forma assíncrona.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

O código completo deve ter esta aparência:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Executar seu aplicativo

Em um shell de comando, use o seguinte comando para executar seu aplicativo:

# Change directory to the folder that contains the management commands project
dotnet run .

Deverá ver um resultado semelhante ao seguinte:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Enfileirar dados na memória para ingestão e consultar os resultados

Você pode ingerir dados da memória criando um fluxo contendo os dados e, em seguida, enfileirando-os para ingestão.

Por exemplo, você pode modificar o aplicativo substituindo a ingestão de do código de de arquivo, da seguinte maneira:

  1. Adicione o pacote do descritor de fluxo às importações na parte superior do arquivo.

    Não são necessários pacotes adicionais.

  2. Adicione uma cadeia de caracteres na memória com os dados a serem ingeridos.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Defina as propriedades de ingestão para não ignorar o primeiro registro, pois a cadeia de caracteres na memória não tem uma linha de cabeçalho.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Ingerir os dados na memória adicionando-os à fila de lotes. Sempre que possível, forneça o tamanho dos dados brutos.

    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

Um esboço do código atualizado deve ter esta aparência:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em um.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Colocar um blob em fila para processamento e consultar os resultados

Você pode ingerir dados de blobs de Armazenamento do Azure, arquivos do Azure Data Lake e arquivos do Amazon S3.

Por exemplo, poderá modificar a aplicação substituindo o componente de ingestão do código de memória pelo seguinte:

  1. Comece carregando o arquivo stormevent.csv em sua conta de armazenamento e gere um URI com permissões de leitura, por exemplo, usando um token SAS para blobs do Azure.

  2. Adicione o pacote descritor de blob às importações na parte superior do arquivo.

    Não são necessários pacotes adicionais.

  3. Crie um descritor de blob usando o URI de blob, defina as propriedades de ingestão e, em seguida, ingira dados do blob. Substitua o marcador <your_blob_uri> pelo URI do blob.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

Um esboço do código atualizado deve ter esta aparência:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Ao executar o aplicativo, você verá um resultado semelhante ao seguinte. Observe que, após a ingestão, o número de linhas na tabela aumentou em 1.000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Próximo passo