Condividi tramite


Creare un'app per ottenere dati usando l'inserimento in coda

Si applica a: ✅Microsoft FabricEsplora dati di Azure

Kusto è in grado di gestire l'assunzione di dati di massa ottimizzando e raggruppando i dati inseriti tramite la gestione batch. La gestione batch aggrega i dati inseriti prima che raggiunga la tabella di destinazione, consentendo un'elaborazione più efficiente e prestazioni migliorate. La creazione di batch viene in genere eseguita in blocchi di 1 GB di dati non elaborati, 1000 singoli file o per un timeout di default di 5 minuti. I criteri di invio in batch possono essere aggiornati a livello di database e tabelle, in genere per ridurre il tempo di invio in batch e ridurre la latenza. Per ulteriori informazioni sull'inserimento in batch, vedere la politica di IngestionBatching e modificare la politica di inserimento in batch a livello di tabella .

Nota

L'invio in batch tiene conto anche di vari fattori, ad esempio il database e la tabella di destinazione, l'utente che esegue l'inserimento e varie proprietà associate all'inserimento, ad esempio tag speciali.

Questo articolo illustra come:

Prerequisiti

Prima di iniziare

  • Usare uno dei seguenti metodi per creare la tabella MyStormEvents e, poiché viene inserita solo una piccola quantità di dati, impostare il timeout dei criteri di batching di inserimento su 10 secondi.

    1. Creare una tabella di destinazione denominata MyStormEvents nel database eseguendo la prima app nei comandi di gestione .
    2. Nei comandi di gestione , impostare il timeout della politica di inserimento in batch su 10 secondi eseguendo la seconda app. Prima di eseguire l'app, modificare il valore di timeout in 00:00:10.

    Nota

    La propagazione delle nuove impostazioni dei criteri di batch al responsabile di batch può richiedere alcuni minuti.

  • Scaricare il file di dati di esempio stormevent.csv. Il file contiene 1.000 record di eventi legati a tempeste.

Nota

Gli esempi seguenti presuppongono una corrispondenza semplice tra le colonne dei dati inseriti e lo schema della tabella di destinazione. Se i dati inseriti non corrispondono in modo semplice allo schema della tabella, è necessario usare un mapping di inserimento per allineare le colonne dei dati allo schema della tabella.

Mettere in coda un file per l'ingestione e interrogare i risultati

Nel tuo IDE o nell'editor di testo preferito, crea un progetto o un file chiamato inserimento di base usando la convenzione appropriata per il linguaggio di programmazione preferito. Posizionare il file stormevent.csv nella stessa posizione dell'app.

Nota

Negli esempi seguenti si usano due client, uno per eseguire query sul cluster e l'altro per inserire i dati nel cluster. Per le lingue in cui la libreria client lo supporta, entrambi i client condividono lo stesso autenticatore della richiesta utente, generando una singola richiesta utente anziché una per ogni client.

Aggiungere il codice seguente:

  1. Crea un'app client che si connette al cluster e stampa il numero di righe nella tabella MyStormEvents. Questo conteggio verrà usato come baseline per il confronto con il numero di righe dopo ogni metodo di inserimento. Sostituire rispettivamente i segnaposto <your_cluster_uri> e <your_database> con l'URI del cluster e il nome del database.

    • C#
    • Python
    • TypeScript
    • Java
    using Kusto.Data;
    using Kusto.Data.Net.Client;
    
    namespace BatchIngest {
      class BatchIngest {
        static void Main(string[] args) {
          string clusterUri = "<your_cluster_uri>";
          var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
            .WithAadUserPromptAuthentication();
    
          using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
            string database = "<your_database>";
            string table = "MyStormEvents";
    
            string query = table + " | count";
            using (var response = kustoClient.ExecuteQuery(database, query, null)) {
              Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
              PrintResultsAsValueList(response);
            }
          }
        }
    
        static void PrintResultsAsValueList(IDataReader response) {
          string value;
          while (response.Read()) {
            for (int i = 0; i < response.FieldCount; i++) {
              value = "";
              if (response.GetDataTypeName(i) == "Int32")
                  value = response.GetInt32(i).ToString();
              else if (response.GetDataTypeName(i) == "Int64")
                value = response.GetInt64(i).ToString();
              else if (response.GetDataTypeName(i) == "DateTime")
                value = response.GetDateTime(i).ToString();
              else if (response.GetDataTypeName(i) == "Object")
                value = response.GetValue(i).ToString() ?? "{}";
              else
                value = response.GetString(i);
    
              Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
          }
        }
      }
    }
    
  2. Creare un oggetto generatore di stringhe di connessione che definisce l'URI di inserimento dati, se possibile, usando la condivisione delle stesse credenziali di autenticazione dell'URI del cluster. Sostituire il segnaposto <your_ingestion_uri> con l'URI di inserimento dati.

    using Kusto.Data.Common;
    using Kusto.Ingest;
    using System.Data;
    
    string ingestUri = "<your_ingestion_uri>";
    var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
      .WithAadUserPromptAuthentication();
    
  3. Inserire il file stormevent.csv aggiungendolo alla coda di elaborazione. Vengono usati gli oggetti e le proprietà seguenti:

    • QueuedIngestClient per creare il client di inserimento.
    • IngestionProperties per impostare le proprietà di inserimento.
    • DataFormat per specificare il formato di file come CSV.
    • ignore_first_record per specificare se la prima riga nei file di tipo CSV e simili viene ignorata, utilizzando la logica seguente:
      • True: la prima riga viene ignorata. Usare questa opzione per eliminare la riga di intestazione dai dati testuali tabulari.
      • False: la prima riga viene inserita come riga normale.

    Nota

    L'ingestione supporta una dimensione massima del file di 6 GB. È consigliabile inserire file compresi tra 100 MB e 1 GB.

    • C#
    • Python
    • TypeScript
    • Java
    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
      string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
    
      Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
      var ingestProps = new KustoIngestionProperties(database, table) {
        Format = DataSourceFormat.csv,
        AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
      };
      _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
    }
    
  4. Eseguire una query sul numero di righe nella tabella dopo l'inserimento del file e visualizzare l'ultima riga inserita.

    Nota

    Per consentire il completamento dell'inserimento, attendere 30 secondi prima di interrogare la tabella. Per C#, attendere 60 secondi per consentire di aggiungere asincronamente il file alla coda di inserimento.

    Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
    Thread.Sleep(TimeSpan.FromSeconds(60));
    
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
      PrintResultsAsValueList(response);
    }
    
    query = table + " | top 1 by ingestion_time()";
    using (var response = kustoClient.ExecuteQuery(database, query, null)) {
      Console.WriteLine("\nLast ingested row:");
      PrintResultsAsValueList(response);
    }
    

Il codice completo dovrebbe essere simile al seguente:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      string clusterUri = "<your_cluster_uri>";
      var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
        .WithAadUserPromptAuthentication();
      string ingestUri = "<your_ingestion_uri>";
      var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
        .WithAadUserPromptAuthentication();


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";
        string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");

        string query = table + " | count";
        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
          PrintResultsAsValueList(response);
        }

        Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
        var ingestProps = new KustoIngestionProperties(database, table) {
          Format = DataSourceFormat.csv,
          AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
        };
        _= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;

        Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
        Thread.Sleep(TimeSpan.FromSeconds(60));

        using (var response = kustoClient.ExecuteQuery(database, query, null)) {
          Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
          PrintResultsAsValueList(response);
        }

        query = table + " | top 1 by ingestion_time()";
        using (var response = kustoClient.ExecuteQuery(database, query, null))
        {
          Console.WriteLine("\nLast ingested row:");
          PrintResultsAsValueList(response);
        }
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      while (response.Read()) {
        for (int i = 0; i < response.FieldCount; i++) {
          if (response.GetDataTypeName(i) == "Int64")
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetInt64(i));
          else
            Console.WriteLine("\t{0} - {1}", response.GetName(i), response.IsDBNull(i) ? "None" : response.GetString(i));
        }
      }
    }
  }
}

Esegui l'app

Nel prompt dei comandi, utilizza il comando seguente per eseguire l'app:

# Change directory to the folder that contains the management commands project
dotnet run .

Verrà visualizzato un risultato simile al seguente:

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 0

Ingesting data from file: 
        C:\MyApp\stormevents.csv

Waiting 30 seconds for ingestion to complete

Number of rows in MyStormEvents AFTER ingesting the file:
         Count - 1000

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Accoda i dati in memoria per l'inserimento ed eseguire query sui risultati

È possibile inserire dati dalla memoria creando un flusso contenente i dati e quindi accodandoli per l'inserimento.

Ad esempio, è possibile modificare l'app sostituendo l'inserimento dal file codice, come indicato di seguito:

  1. Aggiungere il pacchetto del descrittore del flusso alle importazioni all'inizio del file.

    Non sono necessari pacchetti aggiuntivi.

  2. Aggiungere una stringa in memoria con i dati da inserire.

    string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
    var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));
    
  3. Impostare le proprietà di inserimento per non ignorare il primo record, poiché la stringa presente in memoria non ha una riga di intestazione.

    ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
    
  4. Inserire i dati nella memoria aggiungendoli alla coda batch. Se possibile, specificare le dimensioni dei dati non elaborati.

    • C#
    • Python
    • TipoScript
    • Java
    _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;
    

Una struttura del codice aggiornato dovrebbe essere simile alla seguente:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\"";
      var stringStream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(singleLine));

      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "False" }};
        _= ingestClient.IngestFromStreamAsync(stringStream, ingestProps, new StreamSourceOptions {Size = stringStream.Length}).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di uno.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1000

Ingesting data from memory:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from memory:
         Count - 1001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Mettere in coda un blob per l'inserimento e interrogare i risultati

È possibile inserire dati da BLOB di Archiviazione di Azure, file di Azure Data Lake e file Amazon S3.

Ad esempio, è possibile modificare l'app sostituendo il codice relativo all'ingestione dalla memoria con il seguente codice:

  1. Per iniziare, carica il file stormevent.csv nell'account di archiviazione e genera un URI con autorizzazioni di lettura, ad esempio, usando un token di accesso condiviso per i blob di Azure.

  2. Aggiungi il pacchetto descrittore BLOB alle importazioni in cima al file.

    • C#
    • Python
    • TypeScript
    • Java

    Non sono necessari pacchetti aggiuntivi.

  3. Creare un descrittore blob usando l'URI del blob, impostare le proprietà di ingestion e successivamente inserire i dati dal blob. Sostituire il segnaposto <your_blob_uri> con l'URI del BLOB.

    string blobUri = "<your_blob_uri>";
    
    ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
    _= ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;
    

Una struttura del codice aggiornato dovrebbe essere simile alla seguente:

using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;

namespace BatchIngest {
  class BatchIngest {
    static void Main(string[] args) {
      ...
      string blobUri = "<your_blob_uri>";


      using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb))
      using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
        string database = "<your_database>";
        string table = "MyStormEvents";

        ...

        Console.WriteLine("\nIngesting data from memory:");
        ingestProps.AdditionalProperties = new Dictionary<string, string>() { { "ignoreFirstRecord", "True" } };
        _=_ ingestClient.IngestFromStorageAsync(blobUri, ingestProps).Result;

        ...
      }
    }

    static void PrintResultsAsValueList(IDataReader response) {
      ...
    }
  }
}

Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di 1.000.

Number of rows in MyStormEvents BEFORE ingestion:
         Count - 1001

Ingesting data from a blob:

Waiting 30 seconds for ingestion to complete ...

Number of rows in MyStormEvents AFTER ingesting from a blob:
         Count - 2001

Last ingested row:
         StartTime - 2018-01-26 00:00:00+00:00
         EndTime - 2018-01-27 14:00:00+00:00
         State - MEXICO
         DamageProperty - 0
         DamageCrops - 0
         Source - Unknown
         StormSummary - {}

Passaggio successivo