다음을 통해 공유


Delta Live Tables 파이프라인에서 매개 변수 사용

이 문서에서는 Delta Live Tables 파이프라인 구성을 사용하여 파이프라인 코드를 매개 변수화하는 방법을 설명합니다.

참조 매개 변수

업데이트하는 동안 파이프라인 소스 코드는 구문을 사용하여 파이프라인 매개 변수에 액세스하여 Spark 구성에 대한 값을 가져올 수 있습니다.

키를 사용하여 파이프라인 매개 변수를 참조합니다. 소스 코드 논리가 평가되기 전에 값이 소스 코드에 문자열로 삽입됩니다.

다음 예제 구문에서는 키 source_catalog 와 값 dev_catalog 이 있는 매개 변수를 사용하여 구체화된 뷰에 대한 데이터 원본을 지정합니다.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

매개 변수 설정

임의의 키-값 쌍을 파이프라인에 대한 구성으로 전달하여 파이프라인에 매개 변수를 전달합니다. 작업 영역 UI 또는 JSON을 사용하여 파이프라인 구성을 정의하거나 편집하는 동안 매개 변수를 설정할 수 있습니다. Delta Live Tables 파이프라인 구성을 참조하세요.

파이프라인 매개 변수 키는 영숫자 문자만 포함 _ - . 할 수 있습니다. 매개 변수 값은 문자열로 설정됩니다.

파이프라인 매개 변수는 동적 값을 지원하지 않습니다. 파이프라인 구성에서 키와 연결된 값을 업데이트해야 합니다.

Important

예약된 파이프라인 또는 Apache Spark 구성 값과 충돌하는 키워드를 사용하지 마세요.

Python 또는 SQL에서 데이터 세트 선언 매개 변수화

데이터 세트를 정의하는 Python 및 SQL 코드는 파이프라인의 설정에 의해 매개 변수화될 수 있습니다. 매개 변수화를 사용하면 다음과 같은 사용 사례를 사용할 수 있습니다.

  • 코드에서 긴 경로 및 기타 변수를 분리합니다.
  • 개발 또는 스테이징 환경에서 처리되는 데이터의 양을 줄여 테스트 속도를 높입니다.
  • 동일한 변환 논리를 다시 사용하여 여러 데이터 원본에서 처리합니다.

다음 예제에서는 startDate 구성 값을 사용하여 개발 파이프라인을 입력 데이터의 하위 집합으로 제한합니다.

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

매개 변수를 사용하여 데이터 원본 제어

파이프라인 매개 변수를 사용하여 동일한 파이프라인의 여러 구성에서 서로 다른 데이터 원본을 지정할 수 있습니다.

예를 들어 변수 data_source_path 를 사용하여 파이프라인에 대한 개발, 테스트 및 프로덕션 구성에서 다른 경로를 지정한 다음, 다음 코드를 사용하여 참조할 수 있습니다.

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

이 패턴은 초기 수집 중에 수집 논리가 스키마 또는 잘못된 형식의 데이터를 처리하는 방법을 테스트하는 데 유용합니다. 데이터 세트를 전환하는 동안 모든 환경에서 전체 파이프라인에서 동일한 코드를 사용할 수 있습니다.