Kusto .NET SDK를 사용하여 데이터 수집
.NET용 클라이언트 라이브러리에는 수집 라이브러리와 데이터 라이브러리의 두 가지가 있습니다. .NET SDK에 대한 자세한 내용은 .NET SDK 정보를 참조하세요. 이러한 라이브러리를 사용하여 데이터를 클러스터로 수집(로드)하고 코드에서 데이터를 쿼리할 수 있습니다. 이 문서에서는 먼저 테스트 클러스터에서 테이블 및 데이터 매핑을 만듭니다. 그런 다음, 클러스터 큐에 수집을 넣고 결과의 유효성을 검사합니다.
필수 조건
- Microsoft 계정 또는 Microsoft Entra 사용자 ID입니다. Azure 구독이 필요하지 않습니다.
- 클러스터 및 데이터베이스 클러스터 및 데이터베이스를 만듭니다.
수집 라이브러리 설치
Install-Package Microsoft.Azure.Kusto.Ingest
인증 추가 및 연결 문자열 생성
인증
애플리케이션을 인증하기 위해 SDK는 Microsoft Entra 테넌트 ID를 사용합니다. 테넌트 ID를 찾으려면 다음 URL을 사용하여 YourDomain을 사용자 도메인으로 대체합니다.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
예를 들어 도메인이 contoso.com인 경우 URL은 https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/입니다. 결과를 보려면 이 URL을 클릭합니다. 첫 번째 줄은 다음과 같습니다.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
이 경우의 테넌트 ID는 6babcaad-604b-40ac-a9d7-9fd97c0b779f
입니다.
이 예제에서는 대화형 Microsoft Entra 사용자 인증을 사용하여 클러스터에 액세스합니다. 인증서 또는 애플리케이션 비밀과 함께 Microsoft Entra 애플리케이션 인증을 사용할 수도 있습니다. 이 코드를 실행하기 전에 tenantId
및 clusterUri
에 대한 올바른 값을 설정해야 합니다.
SDK는 연결 문자열 일부로 인증 방법을 설정하는 편리한 방법을 제공합니다. 연결 문자열 대한 전체 설명서는 연결 문자열 참조하세요.
참고 항목
현재 버전의 SDK는 .NET Core에서 대화형 사용자 인증을 지원하지 않습니다. 필요한 경우 Microsoft Entra 사용자 이름/암호 또는 애플리케이션 인증을 대신 사용합니다.
연결 문자열 구성
이제 연결 문자열 생성할 수 있습니다. 이후 단계에서 대상 테이블 및 매핑을 만듭니다.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
소스 파일 정보 설정
원본 파일의 경로를 설정합니다. 이 예제에서는 Azure Blob Storage에 호스트된 예제 파일을 사용합니다. StormEvents 샘플 데이터 세트에는 국립 환경 정보 센터의 날씨 관련 데이터가 포함되어 있습니다.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
테스트 클러스터에 테이블 만들기
StormEvents.csv
파일에 있는 데이터 스키마와 일치하는 StormEvents
라는 테이블을 만듭니다.
팁
다음 코드 조각은 거의 모든 호출에 대한 클라이언트 인스턴스를 만듭니다. 이 작업은 각 조각을 개별적으로 실행할 수 있도록 하기 위해 수행됩니다. 프로덕션에서 클라이언트 인스턴스는 재진입되며 필요한 한 유지되어야 합니다. 여러 데이터베이스로 작업하는 경우에도 URI당 단일 클라이언트 인스턴스로 충분합니다(데이터베이스는 명령 수준에서 지정할 수 있음).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
수집 매핑 정의
테이블을 만들 때 사용되는 열 이름에 들어오는 CSV 데이터를 매핑합니다. 해당 테이블에서 CSV 열 매핑 개체를 프로비전합니다.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
테이블에 대한 일괄 처리 정책 정의
수신 데이터를 일괄 처리하면 수집 일괄 처리 정책에 의해 제어되는 데이터 분할 크기가 최적화됩니다. 수집 일괄 처리 정책 관리 명령을 사용하여 정책을 수정합니다. 이 정책을 사용하여 느리게 도착하는 데이터의 대기 시간을 줄입니다.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
수집된 데이터에 대해 Raw Data Size
값을 정의하고 성능이 개선되는지 확인하면서 크기를 250MB까지 점차적으로 줄이는 것이 좋습니다.
Flush Immediately
속성을 사용하여 일괄 처리를 건너뛸 수 있지만 성능이 저하될 수 있으므로 대규모 수집에는 권장하지 않습니다.
수집을 위해 메시지를 큐에 넣음
Blob 스토리지에서 데이터를 끌어온 후 데이터에 수집하기 위해 메시지를 큐에 넣습니다. 수집 클러스터에 대한 연결이 설정되고 해당 엔드포인트와 작동하도록 다른 클라이언트가 만들어집니다.
팁
다음 코드 조각은 거의 모든 호출에 대한 클라이언트 인스턴스를 만듭니다. 이 작업은 각 조각을 개별적으로 실행할 수 있도록 하기 위해 수행됩니다. 프로덕션에서 클라이언트 인스턴스는 재진입되며 필요한 한 유지되어야 합니다. 여러 데이터베이스로 작업하는 경우에도 URI당 단일 클라이언트 인스턴스로 충분합니다(데이터베이스는 명령 수준에서 지정할 수 있음).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
데이터가 테이블에 수집되었는지 검증
대기 중인 수집이 수집을 예약하고 클러스터에 데이터를 로드할 때까지 5~10분 정도 기다립니다. 그런 후, 다음 코드를 실행하여 StormEvents
테이블의 레코드 수를 가져옵니다.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
쿼리 문제 해결 실행
https://dataexplorer.azure.com에 로그인하고 클러스터에 연결합니다. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 수집 실패가 있었는지 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
다음 명령을 실행하여 지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
리소스 정리
다른 문서를 따르려는 경우 만든 리소스를 유지합니다. 그렇지 않으면 데이터베이스에서 다음 명령을 실행하여 StormEvents
테이블을 정리합니다.
.drop table StormEvents