Inserire dati con Kusto Java SDK
Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. La libreria client Java può essere usata per inserire dati, eseguire comandi di gestione dei problemi ed eseguire query sui dati nei cluster di Azure Esplora dati.
Questo articolo illustra come inserire dati usando la libreria Java di Azure Esplora dati. Prima di tutto, si creerà una tabella e un mapping dei dati in un cluster di test. Si accoderà quindi un inserimento dall'archivio BLOB al cluster usando Java SDK e si convalidano i risultati.
Prerequisiti
- Un account Microsoft o un'identità utente di Microsoft Entra. Non è necessaria una sottoscrizione di Azure.
- Un cluster e un database di Esplora dati di Azure. Creare un cluster e un database.
- Git.
- JDK versione 1.8 o successiva.
- Maven.
- Creare una registrazione dell'app e concedergli le autorizzazioni per il database. Salvare l'ID client e il segreto client per usarli in un secondo momento.
Esaminare il codice
Questa sezione è facoltativa. Esaminare i frammenti di codice seguenti per informazioni sul funzionamento del codice. Per ignorare questa sezione, passare a eseguire l'applicazione.
Autenticazione
Il programma usa le credenziali di autenticazione di Microsoft Entra con ConnectionStringBuilder'.
Creare un oggetto
com.microsoft.azure.kusto.data.Client
per query e gestione.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Creare e usare per
com.microsoft.azure.kusto.ingest.IngestClient
accodare l'inserimento dei dati in Azure Esplora dati:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Comandi di gestione
I comandi di gestione, ad esempio .drop
e .create
, vengono eseguiti chiamando execute
su un com.microsoft.azure.kusto.data.Client
oggetto .
Ad esempio, la StormEvents
tabella viene creata nel modo seguente:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Inserimento dati
Accodamento tramite un file da un contenitore di Archiviazione BLOB di Azure esistente.
- Usare
BlobSourceInfo
per specificare il percorso di archiviazione BLOB. - Usare
IngestionProperties
per definire tabelle, database, nome di mapping e tipo di dati. Nell'esempio seguente il tipo di dati èCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Il processo di inserimento viene avviato in un thread separato e il main
thread attende il completamento del thread di inserimento. Questo processo usa CountdownLatch. L'API di inserimento (IngestClient#ingestFromBlob
) non è asincrona. Un while
ciclo viene usato per eseguire il polling dello stato corrente ogni 5 secondi e attende che lo stato di inserimento cambi da Pending
a uno stato diverso. Lo stato finale può essere Succeeded
, Failed
o PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Suggerimento
Esistono altri metodi per gestire l'inserimento in modo asincrono per applicazioni diverse. Ad esempio, è possibile usare CompletableFuture
per creare una pipeline che definisce l'azione dopo l'inserimento, ad esempio eseguire una query sulla tabella o gestire le eccezioni segnalate a IngestionStatus
.
Eseguire l'applicazione
Generali
Quando si esegue il codice di esempio, vengono eseguite le azioni seguenti:
- Elimina tabella:
StormEvents
la tabella viene eliminata (se esistente). - Creazione tabella:
StormEvents
viene creata la tabella. - Creazione del mapping:
StormEvents_CSV_Mapping
viene creato il mapping. - Inserimento di file: un file CSV (in Archiviazione BLOB di Azure) viene accodato per l'inserimento.
Il codice di esempio seguente proviene da App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Suggerimento
Per provare diverse combinazioni di operazioni, rimuovere il commento/impostare come commento i rispettivi metodi in App.java
.
Eseguire l'applicazione
Clonare il codice di esempio da GitHub:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Impostare le informazioni sull'entità servizio con le informazioni seguenti come variabili di ambiente usate dal programma:
- Endpoint cluster
- Nome database
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Compilare ed eseguire:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
L'output sarà analogo al seguente:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Attendere alcuni minuti per il completamento del processo di inserimento. Al termine, verrà visualizzato il messaggio di log seguente: Ingestion completed successfully
. A questo punto è possibile uscire dal programma e passare al passaggio successivo senza influire sul processo di inserimento, che è già stato accodato.
Convalida
Attendere da cinque a 10 minuti per l'inserimento in coda per pianificare il processo di inserimento e caricare i dati in Azure Esplora dati.
Accedere al https://dataexplorer.azure.com e connettersi al cluster.
Eseguire il comando seguente per ottenere il conteggio dei record nella
StormEvents
tabella:StormEvents | count
Risoluzione dei problemi
Per visualizzare gli errori di inserimento nelle ultime quattro ore, eseguire il comando seguente nel database:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore, eseguire il comando seguente:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Pulire le risorse
Se non si prevede di usare le risorse create, eseguire il comando seguente nel database per eliminare la StormEvents
tabella.
.drop table StormEvents