Ingérer des données à l’aide du Kit de développement logiciel (SDK) Java Kusto
L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutive pour les données des journaux et les données de télémétrie. La bibliothèque cliente Java peut être utilisée pour ingérer des données, émettre des commandes de gestion et interroger des données dans des clusters Azure Data Explorer.
Dans cet article, vous allez découvrir comment ingérer des données à l’aide de la bibliothèque Java d’Azure Data Explorer. Tout d’abord, vous allez créer une table et un mappage de données dans un cluster de test. Ensuite, vous effectuerez la mise en file d’attente d’une ingestion du stockage d’objets blob vers le cluster à l’aide du kit SDK Java, et vous validerez les résultats.
Prérequis
- Un compte Microsoft ou une identité utilisateur Microsoft Entra. Un abonnement Azure n’est pas requis.
- Un cluster et une base de données Azure Data Explorer. Créez un cluster et une base de données.
- Git.
- JDK version 1.8 ou ultérieure
- Maven.
- Créez une inscription d’application et accordez-lui des autorisations sur la base de données. Enregistrez l’ID de client et le secret du client pour une utilisation ultérieure.
Vérifier le code
Cette section est facultative. Passez en revue les extraits de code suivants pour découvrir comment fonctionne le code. Pour ignorer cette section, accédez à Exécuter l’application.
Authentification
Le programme utilise les informations d’identification d’authentification Microsoft Entra avec ConnectionStringBuilder'.
Créez un
com.microsoft.azure.kusto.data.Client
pour la requête et la gestion.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Créez et utilisez un
com.microsoft.azure.kusto.ingest.IngestClient
pour la mise en file d’attente de l’ingestion des données dans Azure Data Explorer :static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Commandes de gestion
Les commandes de gestion, telles que .drop
et , sont exécutées en appelant execute
un com.microsoft.azure.kusto.data.Client
.create
objet.
Par exemple, la table StormEvents
est créée comme suit :
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Ingestion des données
Placez l’ingestion en file d’attente à l’aide d’un fichier provenant d’un conteneur Stockage Blob Azure existant.
- Utilisez
BlobSourceInfo
pour spécifier le chemin du Stockage Blob. - Utilisez
IngestionProperties
pour définir la table, la base de données, le nom du mappage et le type de données. Dans l’exemple suivant, le type de données estCSV
.
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Le processus d’ingestion démarre dans un thread distinct, et le thread main
attend que le thread d’ingestion se termine. Ce processus utilise CountdownLatch. L’API d’ingestion (IngestClient#ingestFromBlob
) n’est pas asynchrone. Une boucle while
est utilisée pour interroger l’état actuel tous les cinq secondes, et attend que l’état d’ingestion passe de Pending
à un état différent. L’état final peut être Succeeded
, Failed
ou PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Conseil
Il existe d’autres méthodes pour gérer l’ingestion asynchrone pour différentes applications. Par exemple, vous pouvez utiliser CompletableFuture
pour créer un pipeline définissant l’action post-ingestion, telle que l’interrogation de la table, ou pour gérer les exceptions qui ont été signalées à IngestionStatus
.
Exécution de l'application
Général
Lorsque vous exécutez l’exemple de code, les actions suivantes sont effectuées :
- Supprimer une table : la table
StormEvents
est supprimée (si elle existe). - Création de la table : la table
StormEvents
est créée. - Création du mappage : le mappage
StormEvents_CSV_Mapping
est créé. - Ingestion de fichier : un fichier CSV (dans Stockage Blob Azure) est mis en file d’attente pour l’ingestion.
L’exemple de code suivant provient de App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Conseil
Pour tester différentes combinaisons d’opérations, commentez/décommentez les méthodes respectives dans App.java
.
Exécution de l'application
Clonez l’exemple de code depuis GitHub :
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Définissez les informations du principal de service avec les informations suivantes en tant que variables d’environnement utilisées par le programme :
- Point de terminaison de cluster
- Nom de la base de données
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Générez et exécutez :
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
La sortie doit ressembler à ceci :
Table dropped Table created Mapping created Waiting for ingestion to complete...
Attendez quelques minutes que le processus d’ingestion se termine. Une fois l’opération terminée, le message de journal suivant s’affiche : Ingestion completed successfully
. Vous pouvez quitter le programme à ce stade et passer à l’étape suivante sans affecter le processus d’ingestion, qui a déjà été mis en file d’attente.
Valider
Attendez cinq à dix minutes que l’ingestion en file d’attente planifie le processus d’ingestion et charge les données dans Azure Data Explorer.
Connectez-vous à https://dataexplorer.azure.com et à votre cluster.
Exécutez la commande suivante pour obtenir le nombre d’enregistrements de la table
StormEvents
:StormEvents | count
Résolution des problèmes
Pour voir les échecs d’ingestion des quatre dernières heures, exécutez la commande suivante sur votre base de données :
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures, exécutez la commande suivante :
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Nettoyer les ressources
Si vous ne prévoyez pas d’utiliser les ressources que vous avez créées, exécutez la commande suivante dans votre base de données pour supprimer la table StormEvents
.
.drop table StormEvents