다음을 통해 공유


Azure Data Explorer Node 라이브러리를 사용하여 데이터 수집

Azure 데이터 탐색기는 로그 및 원격 분석 데이터에 사용 가능한 빠르고 확장성이 우수한 데이터 탐색 서비스입니다. Azure 데이터 탐색기는 2개의 Node용 클라이언트 라이브러리, 수집 라이브러리데이터 라이브러리를 제공합니다. 이러한 라이브러리를 사용하여 데이터를 클러스터로 수집(로드)하고 코드에서 데이터를 쿼리할 수 있습니다. 이 문서에서는 먼저 테스트 클러스터에서 테이블 및 데이터 매핑을 만듭니다. 그런 다음, 클러스터 큐에 수집을 넣고 결과의 유효성을 검사합니다.

Azure 구독이 아직 없는 경우 시작하기 전에 Azure 체험 계정을 만듭니다.

필수 구성 요소

데이터 및 수집 라이브러리 설치

azure-kusto-ingestazure-kusto-data 설치

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

import 문 및 상수 추가

라이브러리에서 클래스 가져오기


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

애플리케이션을 인증하기 위해 Azure Data Explorer Microsoft Entra 테넌트 ID를 사용합니다. 테넌트 ID를 찾으려면 Microsoft 365 테넌트 ID 찾기를 따르세요.

이 코드를 실행하기 전에 authorityId, kustoUri, kustoIngestUrikustoDatabase의 값을 설정합니다.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

이제 연결 문자열을 구성합니다. 이 예제에서는 디바이스 인증을 사용하여 클러스터에 액세스합니다. 콘솔 출력을 확인하여 인증을 완료합니다. Microsoft Entra 애플리케이션 인증서, 애플리케이션 키 및 사용자 및 암호를 사용할 수도 있습니다.

이후 단계에서 대상 테이블 및 매핑을 만듭니다.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

소스 파일 정보 설정

추가 클래스를 가져오고 데이터 원본 파일에 대한 상수를 설정합니다. 이 예제에서는 Azure Blob Storage에 호스트된 예제 파일을 사용합니다. StormEvents 샘플 데이터 세트에는 국립 환경 정보 센터의 날씨 관련 데이터가 포함되어 있습니다.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

테스트 클러스터에 테이블 만들기

StormEvents.csv 파일에 있는 데이터 스키마와 일치하는 테이블을 만듭니다. 이 코드가 실행되면 다음과 같은 메시지가 반환됩니다. 로그인하려면 웹 브라우저를 사용하여 https://microsoft.com/devicelogin 페이지를 열고 코드 XXXXXXXXX를 입력하여 인증하세요. 단계에 따라 로그인한 후 돌아가서 다음 코드 블록을 실행합니다. 연결을 만드는 후속 코드 블록을 위해 다시 로그인해야 합니다.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

수집 매핑 정의

테이블을 만들 때 사용되는 데이터 형식과 열 이름에 들어오는 CSV 데이터를 매핑합니다.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

수집을 위해 메시지를 큐에 넣음

Blob Storage에서 데이터를 끌어온 후 Azure 데이터 탐색기에 수집하기 위해 메시지를 큐에 넣습니다.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

테이블에 데이터가 포함되어 있는지 검증

데이터가 테이블에 수집되었는지 검증합니다. 큐에 넣은 수집이 예약되고 데이터가 Azure 데이터 탐색기에 로드될 때까지 5~10분 정도 기다립니다. 그런 후, 다음 코드를 실행하여 StormEvents 테이블의 레코드 수를 가져옵니다.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

쿼리 문제 해결 실행

https://dataexplorer.azure.com에 로그인하고 클러스터에 연결합니다. 데이터베이스에서 다음 명령을 실행하여 지난 4시간 동안 수집 실패가 있었는지 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

다음 명령을 실행하여 지난 4시간 동안 진행된 모든 수집 작업의 상태를 확인합니다. 실행하기 전에 데이터베이스 이름을 바꿉니다.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

리소스 정리

다른 문서를 따르려는 경우 만든 리소스를 유지합니다. 그렇지 않으면 데이터베이스에서 다음 명령을 실행하여 StormEvents 테이블을 정리합니다.

.drop table StormEvents