Condividi tramite


Inserire dati con Kusto Java SDK

Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. La libreria client Java può essere usata per inserire dati, eseguire comandi di gestione dei problemi ed eseguire query sui dati nei cluster di Azure Esplora dati.

Questo articolo illustra come inserire dati usando la libreria Java di Azure Esplora dati. Prima di tutto, si creerà una tabella e un mapping dei dati in un cluster di test. Si accoderà quindi un inserimento dall'archivio BLOB al cluster usando Java SDK e si convalidano i risultati.

Prerequisiti

Esaminare il codice

Questa sezione è facoltativa. Esaminare i frammenti di codice seguenti per informazioni sul funzionamento del codice. Per ignorare questa sezione, passare a eseguire l'applicazione.

Autenticazione

Il programma usa le credenziali di autenticazione di Microsoft Entra con ConnectionStringBuilder'.

  1. Creare un oggetto com.microsoft.azure.kusto.data.Client per query e gestione.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Creare e usare per com.microsoft.azure.kusto.ingest.IngestClient accodare l'inserimento dei dati in Azure Esplora dati:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Comandi di gestione

I comandi di gestione, ad esempio .drop e .create, vengono eseguiti chiamando execute su un com.microsoft.azure.kusto.data.Client oggetto .

Ad esempio, la StormEvents tabella viene creata nel modo seguente:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Inserimento dati

Accodamento tramite un file da un contenitore di Archiviazione BLOB di Azure esistente.

  • Usare BlobSourceInfo per specificare il percorso di archiviazione BLOB.
  • Usare IngestionProperties per definire tabelle, database, nome di mapping e tipo di dati. Nell'esempio seguente il tipo di dati è CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Il processo di inserimento viene avviato in un thread separato e il main thread attende il completamento del thread di inserimento. Questo processo usa CountdownLatch. L'API di inserimento (IngestClient#ingestFromBlob) non è asincrona. Un while ciclo viene usato per eseguire il polling dello stato corrente ogni 5 secondi e attende che lo stato di inserimento cambi da Pending a uno stato diverso. Lo stato finale può essere Succeeded, Failedo PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Suggerimento

Esistono altri metodi per gestire l'inserimento in modo asincrono per applicazioni diverse. Ad esempio, è possibile usare CompletableFuture per creare una pipeline che definisce l'azione dopo l'inserimento, ad esempio eseguire una query sulla tabella o gestire le eccezioni segnalate a IngestionStatus.

Eseguire l'applicazione

Generali

Quando si esegue il codice di esempio, vengono eseguite le azioni seguenti:

  1. Elimina tabella: StormEvents la tabella viene eliminata (se esistente).
  2. Creazione tabella: StormEvents viene creata la tabella.
  3. Creazione del mapping: StormEvents_CSV_Mapping viene creato il mapping.
  4. Inserimento di file: un file CSV (in Archiviazione BLOB di Azure) viene accodato per l'inserimento.

Il codice di esempio seguente proviene da App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Suggerimento

Per provare diverse combinazioni di operazioni, rimuovere il commento/impostare come commento i rispettivi metodi in App.java.

Eseguire l'applicazione

  1. Clonare il codice di esempio da GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Impostare le informazioni sull'entità servizio con le informazioni seguenti come variabili di ambiente usate dal programma:

    • Endpoint cluster
    • Nome database
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Compilare ed eseguire:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    L'output sarà analogo al seguente:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Attendere alcuni minuti per il completamento del processo di inserimento. Al termine, verrà visualizzato il messaggio di log seguente: Ingestion completed successfully. A questo punto è possibile uscire dal programma e passare al passaggio successivo senza influire sul processo di inserimento, che è già stato accodato.

Convalida

Attendere da cinque a 10 minuti per l'inserimento in coda per pianificare il processo di inserimento e caricare i dati in Azure Esplora dati.

  1. Accedere al https://dataexplorer.azure.com e connettersi al cluster.

  2. Eseguire il comando seguente per ottenere il conteggio dei record nella StormEvents tabella:

    StormEvents | count
    

Risoluzione dei problemi

  1. Per visualizzare gli errori di inserimento nelle ultime quattro ore, eseguire il comando seguente nel database:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore, eseguire il comando seguente:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Pulire le risorse

Se non si prevede di usare le risorse create, eseguire il comando seguente nel database per eliminare la StormEvents tabella.

.drop table StormEvents