Kusto è in grado di gestire l'assunzione di dati di massa ottimizzando e raggruppando i dati inseriti tramite la gestione batch. La gestione batch aggrega i dati inseriti prima che raggiunga la tabella di destinazione, consentendo un'elaborazione più efficiente e prestazioni migliorate. La creazione di batch viene in genere eseguita in blocchi di 1 GB di dati non elaborati, 1000 singoli file o per un timeout di default di 5 minuti. I criteri di invio in batch possono essere aggiornati a livello di database e tabelle, in genere per ridurre il tempo di invio in batch e ridurre la latenza. Per ulteriori informazioni sull'inserimento in batch, vedere la politica di IngestionBatching e modificare la politica di inserimento in batch a livello di tabella .
Nota
L'invio in batch tiene conto anche di vari fattori, ad esempio il database e la tabella di destinazione, l'utente che esegue l'inserimento e varie proprietà associate all'inserimento, ad esempio tag speciali.
Usare uno dei seguenti metodi per creare la tabella MyStormEvents e, poiché viene inserita solo una piccola quantità di dati, impostare il timeout dei criteri di batching di inserimento su 10 secondi.
Creare una tabella di destinazione denominata MyStormEvents nel database eseguendo la prima app nei comandi di gestione .
Nei comandi di gestione , impostare il timeout della politica di inserimento in batch su 10 secondi eseguendo la seconda app. Prima di eseguire l'app, modificare il valore di timeout in 00:00:10.
Nell'ambiente di query creare una tabella di destinazione denominata MyStormEvents nel database eseguendo la query seguente:
La propagazione delle nuove impostazioni dei criteri di batch al responsabile di batch può richiedere alcuni minuti.
Scaricare il file di dati di esempio stormevent.csv. Il file contiene 1.000 record di eventi legati a tempeste.
Nota
Gli esempi seguenti presuppongono una corrispondenza semplice tra le colonne dei dati inseriti e lo schema della tabella di destinazione.
Se i dati inseriti non corrispondono in modo semplice allo schema della tabella, è necessario usare un mapping di inserimento per allineare le colonne dei dati allo schema della tabella.
Mettere in coda un file per l'ingestione e interrogare i risultati
Nel tuo IDE o nell'editor di testo preferito, crea un progetto o un file chiamato inserimento di base usando la convenzione appropriata per il linguaggio di programmazione preferito. Posizionare il file stormevent.csv nella stessa posizione dell'app.
Nota
Negli esempi seguenti si usano due client, uno per eseguire query sul cluster e l'altro per inserire i dati nel cluster. Per le lingue in cui la libreria client lo supporta, entrambi i client condividono lo stesso autenticatore della richiesta utente, generando una singola richiesta utente anziché una per ogni client.
Aggiungere il codice seguente:
Crea un'app client che si connette al cluster e stampa il numero di righe nella tabella MyStormEvents. Questo conteggio verrà usato come baseline per il confronto con il numero di righe dopo ogni metodo di inserimento. Sostituire rispettivamente i segnaposto <your_cluster_uri> e <your_database> con l'URI del cluster e il nome del database.
using Kusto.Data;
using Kusto.Data.Net.Client;
namespace BatchIngest {
class BatchIngest {
static void Main(string[] args) {
string clusterUri = "<your_cluster_uri>";
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication();
using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) {
string database = "<your_database>";
string table = "MyStormEvents";
string query = table + " | count";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " BEFORE ingestion:");
PrintResultsAsValueList(response);
}
}
}
static void PrintResultsAsValueList(IDataReader response) {
string value;
while (response.Read()) {
for (int i = 0; i < response.FieldCount; i++) {
value = "";
if (response.GetDataTypeName(i) == "Int32")
value = response.GetInt32(i).ToString();
else if (response.GetDataTypeName(i) == "Int64")
value = response.GetInt64(i).ToString();
else if (response.GetDataTypeName(i) == "DateTime")
value = response.GetDateTime(i).ToString();
else if (response.GetDataTypeName(i) == "Object")
value = response.GetValue(i).ToString() ?? "{}";
else
value = response.GetString(i);
Console.WriteLine("\t{0} - {1}", response.GetName(i), value ?? "None");
}
}
}
}
from azure.identity import InteractiveBrowserCredential
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
def main():
credentials = InteractiveBrowserCredential()
cluster_uri = "<your_cluster_uri>"
cluster_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(cluster_uri, credentials)
with KustoClient(cluster_kcsb) as kusto_client:
database = "<your_database>"
table = "MyStormEvents"
query = table + " | count"
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " BEFORE ingestion:")
print_result_as_value_list(response)
def print_result_as_value_list(response):
cols = (col.column_name for col in response.primary_results[0].columns)
for row in response.primary_results[0]:
for col in cols:
print("\t", col, "-", row[col])
if __name__ == "__main__":
main()
import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data";
import { InteractiveBrowserCredential } from "@azure/identity";
async function main() {
const credentials = new InteractiveBrowserCredential();
const clusterUri = "<your_cluster_uri>";
const clusterKcsb = KustoConnectionStringBuilder.withAadUserPromptAuthentication(clusterUri, credentials);
const kustoClient = new Client(clusterKcsb);
const database = "<your_database>";
const table = "MyStormEvents";
const query = table + " | count";
let response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " BEFORE ingestion:");
printResultAsValueList(response);
}
function printResultsAsValueList(response) {
let cols = response.primaryResults[0].columns;
for (row of response.primaryResults[0].rows()) {
for (col of cols)
console.log("\t", col.name, "-", row.getValueAt(col.ordinal) != null ? row.getValueAt(col.ordinal).toString() : "None")
}
}
main();
Nota
Per le app Node.js, usare InteractiveBrowserCredentialNodeOptions anziché InteractiveBrowserCredentialInBrowserOptions.
Nota
Java SDK attualmente non supporta entrambi i client che condividono lo stesso autenticatore di richieste utente, generando una richiesta utente per ogni client.
Creare un oggetto generatore di stringhe di connessione che definisce l'URI di inserimento dati, se possibile, usando la condivisione delle stesse credenziali di autenticazione dell'URI del cluster. Sostituire il segnaposto <your_ingestion_uri> con l'URI di inserimento dati.
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
string ingestUri = "<your_ingestion_uri>";
var ingestKcsb = new KustoConnectionStringBuilder(ingestUri)
.WithAadUserPromptAuthentication();
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties
ingest_uri = "<your_ingestion_uri>"
ingest_kcsb = KustoConnectionStringBuilder.with_azure_token_credential(ingest_uri, credentials)
using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestKcsb)) {
string filePath = Path.Combine(Directory.GetCurrentDirectory(), "stormevents.csv");
Console.WriteLine("\nIngesting data from file: \n\t " + filePath);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.csv,
AdditionalProperties = new Dictionary<string, string>() {{ "ignoreFirstRecord", "True" }}
};
_= ingestClient.IngestFromStorageAsync(filePath, ingestProps).Result;
}
import os
with QueuedIngestClient(ingest_kcsb) as ingest_client:
file_path = os.path.join(os.path.dirname(__file__), "stormevents.csv")
print("\nIngesting data from file: \n\t " + file_path)
ingest_props = IngestionProperties(database, table, DataFormat.CSV, ignore_first_record=True)
ingest_client.ingest_from_file(file_path, ingest_props)
import path from 'path';
const ingestClient = new IngestClient(ingestKcsb);
const filePath = path.join(__dirname, "stormevents.csv");
console.log("\nIngesting data from file: \n\t " + filePath);
const ingestProps = new IngestionProperties({
database: database,
table: table,
format: DataFormat.CSV,
ignoreFirstRecord: true
});
await ingestClient.ingestFromFile(filePath, ingestProps);
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
try (QueuedIngestClient ingestClient = IngestClientFactory.createClient(ingestKcsb)) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(System.getProperty("user.dir") + "\\stormevents.csv", 0);
System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString());
IngestionProperties ingestProps = new IngestionProperties(database, table);
ingestProps.setDataFormat(DataFormat.CSV);
ingestProps.setIgnoreFirstRecord(true);
ingestClient.ingestFromFile(fileSourceInfo, ingestProps);
}
Eseguire una query sul numero di righe nella tabella dopo l'inserimento del file e visualizzare l'ultima riga inserita.
Nota
Per consentire il completamento dell'inserimento, attendere 30 secondi prima di interrogare la tabella. Per C#, attendere 60 secondi per consentire di aggiungere asincronamente il file alla coda di inserimento.
Console.WriteLine("\nWaiting 60 seconds for ingestion to complete ...");
Thread.Sleep(TimeSpan.FromSeconds(60));
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nNumber of rows in " + table + " AFTER ingesting the file:");
PrintResultsAsValueList(response);
}
query = table + " | top 1 by ingestion_time()";
using (var response = kustoClient.ExecuteQuery(database, query, null)) {
Console.WriteLine("\nLast ingested row:");
PrintResultsAsValueList(response);
}
# Add this to the imports at the top of the file
import time
# Add this to the main method
print("\nWaiting 30 seconds for ingestion to complete ...")
time.sleep(30)
response = kusto_client.execute_query(database, query)
print("\nNumber of rows in " + table + " AFTER ingesting the file:")
print_result_as_value_list(response)
query = table + " | top 1 by ingestion_time()"
response = kusto_client.execute_query(database, query)
print("\nLast ingested row:")
print_result_as_value_list(response)
console.log("\nWaiting 30 seconds for ingestion to complete ...");
await sleep(30000);
response = await kustoClient.execute(database, query);
console.log("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(response);
query = table + " | top 1 by ingestion_time()"
response = await kustoClient.execute(database, query);
console.log("\nLast ingested row:");
printResultsAsValueList(response);
// Add the sleep function after the main method
function sleep(time) {
return new Promise(resolve => setTimeout(resolve, time));
}
System.out.println("\nWaiting 30 seconds for ingestion to complete ...");
Thread.sleep(30000);
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nNumber of rows in " + table + " AFTER ingesting the file:");
printResultsAsValueList(primaryResults);
query = table + " | top 1 by ingestion_time()";
response = kustoClient.execute(database, query);
primaryResults = response.getPrimaryResults();
System.out.println("\nLast ingested row:");
printResultsAsValueList(primaryResults);
Il codice completo dovrebbe essere simile al seguente:
Verrà visualizzato un risultato simile al seguente:
Number of rows in MyStormEvents BEFORE ingestion:
Count - 0
Ingesting data from file:
C:\MyApp\stormevents.csv
Waiting 30 seconds for ingestion to complete
Number of rows in MyStormEvents AFTER ingesting the file:
Count - 1000
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Accoda i dati in memoria per l'inserimento ed eseguire query sui risultati
È possibile inserire dati dalla memoria creando un flusso contenente i dati e quindi accodandoli per l'inserimento.
Ad esempio, è possibile modificare l'app sostituendo l'inserimento dal file codice, come indicato di seguito:
Aggiungere il pacchetto del descrittore del flusso alle importazioni all'inizio del file.
Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di uno.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1000
Ingesting data from memory:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from memory:
Count - 1001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}
Mettere in coda un blob per l'inserimento e interrogare i risultati
È possibile inserire dati da BLOB di Archiviazione di Azure, file di Azure Data Lake e file Amazon S3.
Ad esempio, è possibile modificare l'app sostituendo il codice relativo all'ingestione dalla memoria con il seguente codice:
Per iniziare, carica il file stormevent.csv nell'account di archiviazione e genera un URI con autorizzazioni di lettura, ad esempio, usando un token di accesso condiviso per i blob di Azure.
Aggiungi il pacchetto descrittore BLOB alle importazioni in cima al file.
Creare un descrittore blob usando l'URI del blob, impostare le proprietà di ingestion e successivamente inserire i dati dal blob. Sostituire il segnaposto <your_blob_uri> con l'URI del BLOB.
Quando si esegue l'app, verrà visualizzato un risultato simile al seguente. Si noti che dopo l'inserimento, il numero di righe nella tabella è aumentato di 1.000.
Number of rows in MyStormEvents BEFORE ingestion:
Count - 1001
Ingesting data from a blob:
Waiting 30 seconds for ingestion to complete ...
Number of rows in MyStormEvents AFTER ingesting from a blob:
Count - 2001
Last ingested row:
StartTime - 2018-01-26 00:00:00+00:00
EndTime - 2018-01-27 14:00:00+00:00
State - MEXICO
DamageProperty - 0
DamageCrops - 0
Source - Unknown
StormSummary - {}