다음을 통해 공유


Apache Flink를 사용하여 Azure Data Explorer로 데이터 수집

Apache Flink는 바인딩되지 않은 데이터 스트림과 바인딩된 데이터 스트림에 대한 상태 저장 계산을 위한 프레임워크이자 분산 처리 엔진입니다.

Flink 커넥터는 모든 Flink 클러스터에서 실행할 수 있는 오픈 소스 프로젝트입니다. Flink 클러스터에서 데이터를 이동하기 위한 데이터 싱크를 구현합니다. Apache Flink에 대한 커넥터를 사용하여 ML(기계 학습), ETL(추출-변환-로드) 및 Log Analytics와 같은 데이터 기반 시나리오를 대상으로 하는 빠르고 확장 가능한 애플리케이션을 빌드할 수 있습니다.

이 문서에서는 Flink 커넥터를 사용하여 Flink에서 테이블로 데이터를 보내는 방법을 알아봅니다. 테이블 및 데이터 매핑을 만들고 Flink를 직접 만들어 테이블에 데이터를 보낸 다음 결과의 유효성을 검사합니다.

필수 조건

Maven을 사용하여 종속성을 관리하는 Flink 프로젝트의 경우 종속성으로 추가하여 Azure Data Explorer용 Flink 커넥터 코어 싱크를 통합합니다.

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Maven을 사용하여 종속성을 관리하지 않는 프로젝트의 경우 Apache Flink용 Azure Data Explorer 커넥터에 대한 리포지토리를 복제하고 로컬로 빌드합니다. 이 방법을 사용하면 명령을 mvn clean install -DskipTests사용하여 로컬 Maven 리포지토리에 커넥터를 수동으로 추가할 수 있습니다.

Flink에서 Microsoft Entra ID 애플리케이션 또는 관리 ID를 사용하여 인증할 수 있습니다.

이 서비스 주체는 커넥터가 Kusto에서 테이블을 작성하는 데 사용하는 ID입니다. 나중에 이 서비스 주체가 Kusto 리소스에 액세스할 수 있는 권한을 부여합니다.

  1. Azure CLI를 통해 Azure 구독에 로그인합니다. 그런 다음 브라우저에서 인증합니다.

    az login
    
  2. 보안 주체를 호스트할 구독을 선택합니다. 이 단계는 여러 구독이 있는 경우에 필요합니다.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 서비스 주체를 만듭니다. 이 예에서는 서비스 주체를 my-service-principal이라고 합니다.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 반환된 JSON 데이터에서 복사 passwordtenant 하고 appId나중에 사용할 수 있습니다.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Microsoft Entra 애플리케이션과 서비스 주체를 만들었습니다.

  1. 데이터베이스에 대한 애플리케이션 사용자 권한을 부여합니다.

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. 테이블에 대한 수집기 또는 관리자 권한을 애플리케이션에 부여합니다. 필요한 권한은 선택한 데이터 쓰기 방법에 따라 달라집니다. 수집기 권한은 SinkV2에 충분하지만 WriteAndSink에는 관리자 권한이 필요합니다.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

권한 부여에 대한 자세한 내용은 Kusto 역할 기반 액세스 제어를 참조 하세요.

Flink에서 데이터를 쓰려면 다음을 수행합니다.

  1. 필요한 옵션을 가져옵니다.

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. 애플리케이션 또는 관리 ID를 사용하여 인증합니다.

    애플리케이션 인증의 경우:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    관리 ID 인증의 경우:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. 데이터베이스 및 테이블과 같은 싱크 매개 변수를 구성합니다.

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    다음 표에 설명된 대로 더 많은 옵션을 추가할 수 있습니다.

    옵션 설명 기본값
    IngestionMappingRef 기존 수집 매핑을 참조합니다.
    FlushImmediately 데이터를 즉시 플러시하고 성능 문제를 일으킬 수 있습니다. 이 메서드는 권장되지 않습니다.
    BatchIntervalMs 데이터가 플러시되는 빈도를 제어합니다. 30초
    BatchSize 플러시하기 전에 버퍼링 레코드에 대한 일괄 처리 크기를 설정합니다. 1,000개 레코드
    ClientBatchSizeLimit 수집하기 전에 집계된 데이터의 크기(MB)를 지정합니다. 300MB
    PollForIngestionStatus true이면 커넥터는 데이터 플러시 후 수집 상태를 폴링합니다. false
    DeliveryGuarantee 배달 보장 의미 체계를 결정합니다. 정확히 한 번 의미 체계를 달성하려면 WriteAheadSink를 사용합니다. AT_LEAST_ONCE
  2. 다음 방법 중 하나를 사용하여 스트리밍 데이터를 씁니다.

    • SinkV2: 검사점에서 데이터를 플러시하여 한 번 이상의 일관성을 보장하는 상태 비주류 옵션입니다. 대용량 데이터 수집에는 이 옵션을 사용하는 것이 좋습니다.
    • WriteAheadSink: 이 메서드는 KustoSink에 데이터를 내보낸다. Flink의 검사점 시스템과 통합되며 정확히 한 번 보장됩니다. 데이터는 AbstractStateBackend에 저장되고 검사점이 완료된 후에만 커밋됩니다.

    다음 예제에서는 SinkV2를 사용합니다. WriteAheadSink를 사용하려면 다음 대신 메서드를 buildWriteAheadSink build사용합니다.

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

전체 코드는 다음과 같습니다.

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

데이터가 수집되었는지 확인

연결이 구성되면 데이터가 테이블로 전송됩니다. KQL 쿼리를 실행하여 데이터가 수집되는지 확인할 수 있습니다.

  1. 다음 쿼리를 실행하여 데이터가 테이블에 수집되는지 확인합니다.

    <MyTable>
    | count
    
  2. 다음 쿼리를 실행하여 데이터를 봅니다.

    <MyTable>
    | take 100
    
  • 쿼리 작성
  • Azure Data Explorer를 사용하여 AKS의 Azure HDInsight에서 Apache Flink 사용