Flink/Delta 커넥터를 사용하는 방법
참고 항목
2025년 1월 31일에 Azure HDInsight on AKS가 사용 중지됩니다. 2025년 1월 31일 이전에 워크로드가 갑자기 종료되지 않도록 워크로드를 Microsoft Fabric 또는 동등한 Azure 제품으로 마이그레이션해야 합니다. 구독의 나머지 클러스터는 호스트에서 중지되고 제거됩니다.
사용 중지 날짜까지 기본 지원만 사용할 수 있습니다.
Important
이 기능은 현지 미리 보기로 제공됩니다. Microsoft Azure 미리 보기에 대한 보충 사용 약관에는 베타 또는 미리 보기로 제공되거나 아직 일반 공급으로 릴리스되지 않은 Azure 기능에 적용되는 더 많은 약관이 포함되어 있습니다. 이 특정 미리 보기에 대한 자세한 내용은 Azure HDInsight on AKS 미리 보기 정보를 참조하세요. 질문이나 기능 제안이 있는 경우 세부 정보와 함께 AskHDInsight에 요청을 제출하고 Azure HDInsight 커뮤니티에서 추가 업데이트를 보려면 팔로우하세요.
Apache Flink와 Delta Lake를 함께 사용하면 안정적이고 스케일링 가능한 데이터 레이크하우스 아키텍처를 만들 수 있습니다. Flink/Delta 커넥터를 사용하면 ACID 트랜잭션을 사용하여 정확히 한 번 처리하는 Delta 테이블에 데이터를 쓸 수 있습니다. 즉, 검사점에서 Flink 파이프라인을 다시 시작하는 경우에도 데이터 스트림이 일관되고 오류가 없습니다. Flink/Delta 커넥터는 데이터가 손실되거나 중복되지 않고 Flink 의미 체계와 일치하도록 합니다.
이 문서에서는 Flink-Delta 커넥터를 사용하는 방법에 대해 알아봅니다.
- Delta 테이블에서 데이터를 읽습니다.
- Delta 테이블에 데이터를 씁니다.
- Power BI에서 쿼리합니다.
Flink/Delta 커넥터란?
Flink/Delta 커넥터는 Apache Flink 애플리케이션에서 Delta 독립 실행형 JVM 라이브러리를 활용하는 Delta 테이블로 데이터를 읽고 쓰는 JVM 라이브러리입니다. 커넥터는 정확히 한 번 전송 보장을 제공합니다.
Flink/Delta 커넥터에는 다음이 포함됩니다.
Apache Flink에서 Delta 테이블에 데이터를 쓰기 위한 DeltaSink입니다. Apache Flink를 사용하여 Delta 테이블을 읽기 위한 DeltaSource입니다.
Apache Flink-Delta 커넥터에 포함:
커넥터 버전에 따라 다음 Apache Flink 버전과 함께 사용할 수 있습니다.
Connector's version Flink's version
0.4.x (Sink Only) 1.12.0 <= X <= 1.14.5
0.5.0 1.13.0 <= X <= 1.13.6
0.6.0 X >= 1.15.3
0.7.0 X >= 1.16.1 --- We use this in Flink 1.17.0
필수 조건
- AKS의 HDInsight Flink 1.17.0 클러스터
- Flink-Delta 커넥터 0.7.0
- MSI를 사용하여 ADLS Gen2에 액세스
- 개발용 IntelliJ
Delta 테이블에서 데이터 읽기
Delta 원본은 다음과 같이 두 가지 모드 중 하나로 작동할 수 있습니다.
제한된 모드 특정 테이블 버전에 대해서만 Delta 테이블의 콘텐츠를 읽으려는 일괄 작업에 적합합니다. DeltaSource.forBoundedRowData API를 사용하여 이 모드의 원본을 만듭니다.
연속 모드 새로운 변경 내용과 버전이 있는지 Delta 테이블을 지속적으로 확인하려는 스트리밍 작업에 적합합니다. DeltaSource.forContinuousRowData API를 사용하여 이 모드의 원본을 만듭니다.
예: 제한된 모드에서 모든 열을 읽기 위한 Delta 테이블의 원본 만들기. 일괄 작업에 적합합니다. 이 예에서는 최신 테이블 버전을 로드합니다.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Delta 싱크에 쓰기
현재 Delta Sink는 다음과 같은 Flink 메트릭을 공개합니다.
분할되지 않은 테이블에 대한 싱크 만들기
이 예에서는 DeltaSink를 만들고 이를 기존 org.apache.flink.streaming.api.datastream.DataStream
에 연결하는 방법을 보여 줍니다.
import io.delta.flink.sink.DeltaSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
전체 코드
델타 테이블에서 데이터를 읽고 다른 델타 테이블에 싱크합니다.
package contoso.example;
import io.delta.flink.sink.DeltaSink;
import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
public class DeltaSourceExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the sink Delta table path
String deltaTablePath_sink = "abfss://container@account_name.dfs.core.windows.net/data/testdelta_output";
// Define the source Delta table path
String deltaTablePath_source = "abfss://container@account_name.dfs.core.windows.net/data/testdelta";
// Define the source Delta table path
RowType rowType = RowType.of(
DataTypes.STRING().getLogicalType(), // Date
DataTypes.STRING().getLogicalType(), // Time
DataTypes.STRING().getLogicalType(), // TargetTemp
DataTypes.STRING().getLogicalType(), // ActualTemp
DataTypes.STRING().getLogicalType(), // System
DataTypes.STRING().getLogicalType(), // SystemAge
DataTypes.STRING().getLogicalType() // BuildingID
);
// Create a bounded Delta source for all columns
DataStream<RowData> deltaStream = createBoundedDeltaSourceAllColumns(env, deltaTablePath_source);
createDeltaSink(deltaStream, deltaTablePath_sink, rowType);
// Execute the Flink job
env.execute("Delta datasource and sink Example");
}
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new Path(deltaTablePath),
new Configuration())
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
new Configuration(),
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
}
Maven Pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>contoso.example</groupId>
<artifactId>FlinkDeltaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop-version>3.3.4</hadoop-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-flink</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
jar를 패키지하고 Flink 클러스터에 제출하여 실행함
AppMode 클러스터에 작업 jar 정보를 전달합니다.
참고 항목
ADLS를 읽거나 쓰는 동안 항상
hadoop.classpath.enable
을 사용하도록 설정합니다.클러스터를 제출하면 Flink UI에서 작업을 볼 수 있습니다.
ADLS에서 결과를 찾습니다.
Power BI 통합
데이터가 Delta 싱크에 있으면 Power BI 데스크톱에서 쿼리를 실행하고 보고서를 만들 수 있습니다.
Power BI Desktop을 열어 ADLS Gen2 커넥터를 사용하여 데이터를 가져옵니다.
스토리지 계정의 URL입니다.
원본에 대한 M-쿼리를 만들고 스토리지 계정에서 데이터를 쿼리하는 함수를 호출합니다.
데이터를 쉽게 사용할 수 있게 되면 보고서를 만들 수 있습니다.
참조
- Apache, Apache Flink, Flink 및 연결된 오픈 소스 프로젝트 이름은 ASF(Apache Software Foundation)의 상표입니다.