Delta Live Tables 흐름을 사용하여 증분 방식으로 데이터 로드 및 처리
이 문서에서는 흐름의 정의와 Delta Live Tables 파이프라인의 흐름을 사용하여 원본에서 대상 스트리밍 테이블로 데이터를 증분 처리할 수 있는 방법을 설명합니다. Delta Live 테이블에서 흐름은 다음 두 가지 방법으로 정의됩니다.
- 스트리밍 테이블을 업데이트하는 쿼리를 만들 때 흐름이 자동으로 정의됩니다.
- 또한 Delta Live Tables는 여러 스트리밍 원본에서 스트리밍 테이블에 추가하는 등 보다 복잡한 처리를 위한 흐름을 명시적으로 정의하는 기능을 제공합니다.
이 문서에서는 스트리밍 테이블을 업데이트하는 쿼리를 정의할 때 생성되는 암시적 흐름에 대해 설명한 다음 더 복잡한 흐름을 정의하는 구문에 대한 세부 정보를 제공합니다.
흐름이란?
Delta Live Tables 에서 흐름 은 원본 데이터를 증분 방식으로 처리하여 대상 스트리밍 테이블을 업데이트하는 스트리밍 쿼리입니다. 파이프라인에서 만드는 대부분의 Delta Live Tables 데이터 세트는 흐름을 쿼리의 일부로 정의하며 흐름을 명시적으로 정의할 필요가 없습니다. 예를 들어 별도의 테이블 및 흐름 문을 사용하여 스트리밍 테이블을 만드는 대신 단일 DDL 명령으로 Delta Live Tables에 스트리밍 테이블을 만듭니다.
참고 항목
이 CREATE FLOW
예제는 설명용으로만 제공되며 유효한 Delta Live Tables 구문이 아닌 키워드를 포함합니다.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
쿼리에서 정의한 기본 흐름 외에도 Delta Live Tables Python 및 SQL 인터페이스는 추가 흐름 기능을 제공합니다. 추가 흐름은 단일 스트리밍 테이블을 업데이트하기 위해 여러 스트리밍 원본에서 데이터를 읽어야 하는 처리를 지원합니다. 예를 들어 기존 스트리밍 테이블과 흐름이 있고 이 기존 스트리밍 테이블에 쓰는 새 스트리밍 원본을 추가하려는 경우 추가 흐름 기능을 사용할 수 있습니다.
추가 흐름을 사용하여 여러 원본 스트림에서 스트리밍 테이블에 쓰기
Python 인터페이스의 @append_flow
데코레이터 또는 SQL 인터페이스의 CREATE FLOW
절을 사용하여 여러 스트리밍 원본의 스트리밍 테이블에 씁니다. 다음과 같은 작업을 처리하는 데 추가 흐름을 사용합니다.
- 전체 새로 고침을 요구하지 않고 기존 스트리밍 테이블에 데이터를 추가하는 스트리밍 원본을 추가합니다. 예를 들어 작업하는 모든 지역의 지역 데이터를 결합하는 테이블이 있을 수 있습니다. 새 지역이 롤아웃되면 전체 새로 고침을 수행하지 않고 테이블에 새 지역 데이터를 추가할 수 있습니다. 예제: 여러 Kafka 토픽의 스트리밍 테이블에 씁니다.
- 누락된 기록 데이터(백필)를 추가하여 스트리밍 테이블을 업데이트합니다. 예를 들어 Apache Kafka 토픽에서 작성한 기존 스트리밍 테이블이 있습니다. 또한 스트리밍 테이블에 정확히 한 번 삽입해야 하는 기록 데이터가 테이블에 저장되어 있으며 데이터를 삽입하기 전에 복잡한 집계 수행이 처리에 포함되므로 데이터를 스트리밍할 수 없습니다. 예제: 일회성 데이터 백필을 실행합니다.
- 쿼리에서 절을 사용하는
UNION
대신 여러 원본의 데이터를 결합하고 단일 스트리밍 테이블에 씁니다. 대신 추가 흐름 처리를UNION
사용하면 전체 새로 고침 업데이트를 실행하지 않고도 대상 테이블을 증분 방식으로 업데이트할 수 있습니다. 예제 : UNION 대신 추가 흐름 처리를 사용합니다.
추가 흐름 처리에 의한 레코드 출력의 대상은 기존 테이블 또는 새 테이블일 수 있습니다. Python 쿼리의 경우 create_streaming_table() 함수를 사용하여 대상 테이블을 만듭니다.
Important
- 예상을 사용하여 데이터 품질 제약 조건을 정의해야 하는 경우 대상 테이블에 대한 기대치를 함수의
create_streaming_table()
일부로 정의하거나 기존 테이블 정의에 정의합니다. 정의에서@append_flow
기대치를 정의할 수 없습니다. - 흐름은 흐름 이름으로 식별되며 이 이름은 스트리밍 검사점을 식별하는 데 사용됩니다. 흐름 이름을 사용하여 검사점을 식별하는 것은 다음을 의미합니다.
- 파이프라인의 기존 흐름 이름이 바뀌면 검사점이 이월되지 않으며 이름이 바뀐 흐름은 사실상 완전히 새로운 흐름입니다.
- 기존 검사점이 새 흐름 정의와 일치하지 않으므로 파이프라인에서 흐름 이름을 다시 사용할 수 없습니다.
다음은 다음 구문입니다 @append_flow
.
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
예: 여러 Kafka 토픽의 스트리밍 테이블에 쓰기
다음 예제에서는 이름이 지정된 kafka_target
스트리밍 테이블을 만들고 두 Kafka 토픽에서 해당 스트리밍 테이블에 씁니다.
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
SQL 쿼리에서 read_kafka()
사용되는 테이블 반환 함수에 대한 자세한 내용은 SQL 언어 참조의 read_kafka 참조하세요.
예: 일회성 데이터 백필 실행
다음 예제에서는 쿼리를 실행하여 기록 데이터를 스트리밍 테이블에 추가합니다.
참고 항목
백필 쿼리가 예약된 기준으로 또는 지속적으로 실행되는 파이프라인의 일부일 때 진정한 일회성 백필을 보장하려면 파이프라인을 한 번 실행한 후 쿼리를 제거합니다. 백필 디렉터리에 도착하는 경우 새 데이터를 추가하려면 쿼리를 그대로 둡니다.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
예: 대신 추가 흐름 처리 사용 UNION
절과 함께 UNION
쿼리를 사용하는 대신 추가 흐름 쿼리를 사용하여 여러 원본을 결합하고 단일 스트리밍 테이블에 쓸 수 있습니다. 대신 추가 흐름 쿼리를 UNION
사용하면 전체 새로 고침을 실행하지 않고 여러 원본에서 스트리밍 테이블에 추가할 수 있습니다.
다음 Python 예제에는 여러 데이터 원본을 절과 결합하는 쿼리가 UNION
포함되어 있습니다.
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
다음 예제에서는 쿼리를 UNION
추가 흐름 쿼리로 바꿉니다.
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);