Ingérer des données à l’aide du Kit de développement logiciel (SDK) Kusto .NET
Il existe deux bibliothèques de client pour.NET : une bibliothèque d’ingestion et une bibliothèque de données. Pour plus d’informations sur le SDK .NET, consultez À propos du SDK .NET. Ces bibliothèques vous permettent d’ingérer (charger) des données dans un cluster et d’interroger les données de votre code. Dans cet article, vous allez d’abord créer une table et un mappage de données dans un cluster de test. Ensuite, vous allez mettre en file d’attente l’ingestion sur le cluster et valider les résultats.
Prérequis
- Un compte Microsoft ou une identité utilisateur Microsoft Entra. Un abonnement Azure n’est pas requis.
- Un cluster et une base de données. Créez un cluster et une base de données.
Installer la bibliothèque d’ingestion
Install-Package Microsoft.Azure.Kusto.Ingest
Ajouter une authentification et construire une chaîne de connexion
Authentification
Pour authentifier une application, le Kit de développement logiciel (SDK) utilise votre ID de locataire Microsoft Entra. Pour trouver votre ID de locataire, utilisez l’URL suivante en remplaçant YourDomain par votre domaine.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Par exemple, si votre domaine est contoso.com, l’URL est : https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Cliquez sur cette URL pour voir les résultats. La première ligne est la suivante.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
Ici, l’ID de locataire est 6babcaad-604b-40ac-a9d7-9fd97c0b779f
.
Cet exemple utilise une authentification d’utilisateur Microsoft Entra interactive pour accéder au cluster. Vous pouvez également utiliser l’authentification d’application Microsoft Entra avec un certificat ou un secret d’application. Veillez à définir les valeurs appropriées pour tenantId
et clusterUri
avant d’exécuter ce code.
Le SDK offre un moyen pratique de configurer la méthode d’authentification dans la chaîne de connexion. Pour obtenir une documentation complète sur les chaînes de connexion, consultez Chaînes de connexion.
Remarque
La version actuelle du SDK ne prend pas en charge l’authentification utilisateur interactive sur .NET Core. Si nécessaire, utilisez le nom d’utilisateur/mot de passe Microsoft Entra ou l’authentification d’application à la place.
Construire la chaîne de connexion
Vous pouvez maintenant construire la chaîne de connexion. Vous allez créer la table de destination et le mappage dans une étape ultérieure.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Définir les informations du fichier source
Définissez le chemin du fichier source. Cet exemple utilise un exemple de fichier hébergé sur Stockage Blob Azure. L’exemple de jeu de données StormEvents contient des données météorologiques des National Centers for Environmental Information.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Créer une table sur votre cluster de test
Créez une table nommée StormEvents
qui correspond au schéma des données dans le fichier StormEvents.csv
.
Conseil
Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Définir le mappage d’ingestion
Mappez les données CSV entrantes aux noms de colonnes utilisés lors de la création de la table. Provisionnez un objet de mappage de colonne CSV sur cette table.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Définir la stratégie de traitement par lots pour votre table
Le traitement par lot des données entrantes optimise la taille de partition de données qui est contrôlée par la stratégie de traitement par lot de l’ingestion. Modifiez la stratégie avec la commande de gestion de la stratégie de traitement par lot de l’ingestion. Utilisez cette stratégie pour réduire la latence des données dont l’arrivée est lente.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Nous vous recommandons de définir une valeur Raw Data Size
pour les données ingérées et de diminuer de façon incrémentielle la taille vers 250 Mo, tout en vérifiant si les performances s’améliorent.
Vous pouvez utiliser la propriété Flush Immediately
pour ignorer le traitement par lot, même si cela n’est pas recommandé pour l’ingestion à grande échelle, car cela peut entraîner des performances médiocres.
Mettre en file d’attente un message pour l’ingestion
Mettez en file d’attente un message pour extraire les données du stockage d’objets blob et ingérer les données. Une connexion est établie avec le cluster d’ingestion, et un autre client est créé pour utiliser ce point de terminaison.
Conseil
Les extraits de code suivants créent une instance d’un client pour presque tous les appels. Cela permet de rendre chaque extrait de code exécutable individuellement. En production, les instances de client sont réentrantes et doivent être conservées aussi longtemps que nécessaire. Une seule instance de client par URI suffit, même quand vous utilisez plusieurs bases de données (la base de données peut être spécifiée au niveau de la commande).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Vérifier que les données ont été ingérées dans la table
Attendez cinq à dix minutes avant que l’ingestion en file d’attente planifie l’ingestion et charge les données dans votre cluster. Exécutez ensuite le code suivant pour obtenir le nombre d’enregistrements de la table StormEvents
.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Exécuter des requêtes de dépannage
Connectez-vous à https://dataexplorer.azure.com et à votre cluster. Exécutez la commande suivante dans votre base de données pour voir si des échecs d’ingestion se sont produits ces quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Exécutez la commande suivante pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures. Remplacez le nom de la base de données avant l’exécution.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Nettoyer les ressources
Si vous envisagez de suivre nos autres articles, conservez les ressources que vous avez créées. Dans le cas contraire, exécutez la commande suivante dans votre base de données pour nettoyer la table StormEvents
.
.drop table StormEvents