다음을 통해 공유


SQL을 사용하여 파이프라인 코드 개발

Delta Live Tables에는 파이프라인에서 구체화된 뷰 및 스트리밍 테이블을 정의하기 위한 몇 가지 새로운 SQL 키워드 및 함수가 도입되었습니다. 파이프라인 개발을 위한 SQL 지원은 Spark SQL의 기본 사항을 기반으로 하며 구조적 스트리밍 기능에 대한 지원을 추가합니다.

PySpark DataFrames에 익숙한 사용자는 Python을 사용하여 파이프라인 코드를 개발하는 것을 선호할 수 있습니다. Python은 메타프로그래밍 작업과 같이 SQL로 구현하기 어려운 보다 광범위한 테스트 및 작업을 지원합니다. Python을 사용하여 파이프라인 코드 개발을 참조하세요.

Delta Live Tables SQL 구문에 대한 전체 참조는 Delta Live Tables SQL 언어 참조를 참조하세요.

파이프라인 개발을 위한 SQL의 기본 사항

Delta Live Tables 데이터 세트를 만드는 SQL 코드는 이 구문을 사용하여 CREATE OR REFRESH 쿼리 결과에 대해 구체화된 뷰 및 스트리밍 테이블을 정의합니다.

키워드는 STREAM 절에서 참조되는 데이터 원본을 SELECT 스트리밍 의미 체계로 읽어야 하는지를 나타냅니다.

Delta Live Tables 소스 코드는 SQL 스크립트와 매우 다릅니다. Delta Live Tables는 파이프라인에 구성된 모든 소스 코드 파일에서 모든 데이터 세트 정의를 평가하고 쿼리를 실행하기 전에 데이터 흐름 그래프를 작성합니다. Notebook 또는 스크립트에 표시되는 쿼리 순서는 실행 순서를 정의하지 않습니다.

SQL을 사용하여 구체화된 뷰 만들기

다음 코드 예제에서는 SQL을 사용하여 구체화된 뷰를 만들기 위한 기본 구문을 보여 줍니다.

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

SQL을 사용하여 스트리밍 테이블 만들기

다음 코드 예제에서는 SQL을 사용하여 스트리밍 테이블을 만들기 위한 기본 구문을 보여 줍니다.

참고 항목

모든 데이터 원본이 스트리밍 읽기를 지원하는 것은 아니며, 일부 데이터 원본은 항상 스트리밍 의미 체계로 처리되어야 합니다.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

개체 스토리지에서 데이터 로드

Delta Live Tables는 Azure Databricks에서 지원하는 모든 형식의 데이터 로드를 지원합니다. 데이터 형식 옵션을 참조하세요.

참고 항목

이러한 예제에서는 작업 영역에 자동으로 탑재된 데이터에서 /databricks-datasets 사용할 수 있는 데이터를 사용합니다. Databricks는 볼륨 경로 또는 클라우드 URI를 사용하여 클라우드 개체 스토리지에 저장된 데이터를 참조하는 것이 좋습니다. Unity 카탈로그 볼륨이란?을 참조하세요.

Databricks는 클라우드 개체 스토리지에 저장된 데이터에 대해 증분 수집 워크로드를 구성할 때 자동 로더 및 스트리밍 테이블을 사용하는 것이 좋습니다. 자동 로더란?을 참조하세요.

SQL은 이 read_files 함수를 사용하여 자동 로더 기능을 호출합니다. 키워드를 STREAM 사용하여 스트리밍 읽기 read_files를 구성해야 합니다.

다음 예제에서는 자동 로더를 사용하여 JSON 파일에서 스트리밍 테이블을 만듭니다.

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

또한 이 함수는 read_files 구체화된 뷰를 만들기 위한 일괄 처리 의미 체계를 지원합니다. 다음 예제에서는 일괄 처리 의미 체계를 사용하여 JSON 디렉터리를 읽고 구체화된 뷰를 만듭니다.

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

예상 데이터 유효성 검사

기대치를 사용하여 데이터 품질 제약 조건을 설정하고 적용할 수 있습니다. Delta Live Tables를 사용하여 데이터 품질 관리를 참조하세요.

다음 코드는 데이터 수집 중에 null인 레코드를 삭제하는 명명된 valid_data 예상을 정의합니다.

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

파이프라인에 정의된 구체화된 뷰 및 스트리밍 테이블 쿼리

스키마를 LIVE 사용하여 파이프라인에 정의된 다른 구체화된 뷰 및 스트리밍 테이블을 쿼리합니다.

다음 예제에서는 4개의 데이터 세트를 정의합니다.

  • JSON 데이터를 로드하는 스트리밍 테이블입니다 orders .
  • CSV 데이터를 로드하는 구체화된 뷰입니다 customers .
  • 데이터 세트와 데이터 세트의 orders 레코드를 조인하고customers, 주문 타임스탬프를 날짜로 캐스팅하고, , , stateorder_numberorder_date 필드를 선택하는 customer_id구체화 customer_orders 된 뷰입니다.
  • 각 상태에 대한 주문의 일일 수를 집계하는 구체화된 뷰입니다 daily_orders_by_state .
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;