다음을 통해 공유


CREATE STREAMING TABLE

적용 대상:예로 표시된 확인 Databricks SQL

스트리밍 table을(를) 생성하며, 스트리밍이나 증분 데이터 처리를 추가 지원하는 델타 table을(를) 포함합니다.

스트리밍 tables는 Delta Live Tables에서만 지원되며 Unity Catalog이 있는 Databricks SQL에서도 지원됩니다. 지원되는 Databricks Runtime 컴퓨팅에서 이 명령을 실행하면 구문만 구문 분석됩니다. SQL을 사용하여 파이프라인 코드 개발을 참조하세요.

구문

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    쿼리에 정의된 원본에서 가장 최신 데이터를 받아 table를 새로 고칩니다(지정한 경우). 쿼리가 시작되기 전에 도착하는 새 데이터만 처리됩니다. 명령을 실행하는 동안 원본에 추가되는 새 데이터는 다음 refresh때까지 무시됩니다. CREATE OR REFRESHrefresh 작업은 완전히 선언적입니다. refresh 명령이 원래 table 생성 명령문의 모든 메타데이터를 지정하지 않으면 지정되지 않은 메타데이터는 삭제됩니다.

  • IF NOT EXISTS

    스트리밍 table 생성합니다(없는 경우). 이 이름의 table 이미 있는 경우 CREATE STREAMING TABLE 문이 무시됩니다.

    IF NOT EXISTS 또는 OR REFRESH 중 최대 하나를 지정할 수 있습니다.

  • table_name

    만들어질 table의 이름입니다. 이름에는 임시 사양 또는 옵션 사양이 포함되어서는 안됩니다. 이름이 적절히 지정되지 않은 경우, 현재 schema에서 table이 만들어집니다.

  • table_specification

    이 선택적 절은 columns의 list, 그 형식, 속성, 설명 및 column의 제약 조건을 정의합니다.

    columns을 tableschema에 정의하지 않으면 AS query을 지정해야 합니다.

    • column_identifier

      column의 고유한 이름.

      • column_type

        column의 데이터 유형을 지정합니다.

      • NOT NULL

        명시된 경우 column는 NULLvalues를 허용하지 않습니다.

      • COMMENT column_comment

        column을 설명하는 문자열 리터럴입니다.

      • column_constraint

        Important

        이 기능은 공개 미리 보기 상태입니다.

        스트리밍 table에서 기본 키 또는 외래 키 constraint을/를 column에 추가합니다. hive_metastore catalog에서 tables에 대한 제약 조건은 지원되지 않습니다.

      • MASK 절

        Important

        이 기능은 공개 미리 보기 상태입니다.

        column 마스크 함수를 추가하여 중요한 데이터를 익명화합니다. 해당 column의 모든 후속 쿼리는 column의 원래 값을 대신해 column에 대해 그 함수를 평가한 결과를 받습니다. 이는 함수가 호출하는 사용자의 ID 또는 그룹 멤버 자격을 검사하여 값을 수정할지 여부를 결정할 수 where 세분화된 액세스 제어 목적에 유용할 수 있습니다.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        table에 데이터 품질 기대사항을 추가합니다. 이러한 데이터 품질 기대치는 시간이 지남에 따라 추적되며 스트리밍 table의 이벤트 로그를 통해 액세스할 수 있습니다. FAIL UPDATE 예상으로 인해 table을 만들 때와 table를 새로 고칠 때 처리가 실패하게 됩니다. DROP ROW 기대치로 인해 기대치가 충족되지 않으면 전체 행이 삭제됩니다.

        expectation_expr는 리터럴, table내의 column 식별자, 그리고 다음을 제외한 결정적이면서 기본적으로 제공되는 SQL 함수 또는 연산자로 구성될 수 있습니다.

        또한 expr에는 하위 쿼리가 포함되어서는 안 됩니다.

      • table_constraint

        Important

        이 기능은 공개 미리 보기 상태입니다.

        스트리밍 table에 정보 기본 키 또는 정보 외래 키 제약 조건을 추가합니다. hive_metastore catalog에서는 tables에 대한 키 제약 조건이 지원되지 않습니다.

  • table_clauses

    옵션으로 분할, 주석, 사용자 정의 속성 및 새 table에 대한 refresh 일정을 지정할 수 있습니다. 각 하위 절은 한 번만 지정할 수 있습니다.

    • PARTITIONED BY

      table의 columns의 선택적 list은(는) partition에 의해 table를.

    • COMMENT table_comment

      STRING 리터럴을 사용하여 table을 설명합니다.

    • TBLPROPERTIES

      선택적으로 하나 이상의 사용자 정의 속성을 설정합니다.

      이 설정을 사용하여 이 문을 실행하는 데 사용되는 Delta Live Tables 런타임 채널을 지정합니다. Set pipelines.channel 속성 값을 "PREVIEW" 또는 "CURRENT"로 설정합니다. 기본값은 "CURRENT"입니다. Delta Live Tables 채널에 대한 자세한 내용은 Delta Live Tables 런타임 채널참조하세요.

    • 일정 [ REFRESH ] 일정_조항

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      Important

      이 기능은 공개 미리 보기 상태입니다.

      주기적으로 발생하는 refresh을 예약하려면 EVERY 구문을 이용하세요. EVERY 구문을 지정하면 스트리밍 table 또는 구체화된 뷰는 제공된 값(예: HOUR, HOURS, DAY, DAYS, WEEK또는 WEEKS)에 따라 지정된 간격으로 주기적으로 새로 고쳐집니다. 다음 table 에는 number에 허용되는 정수 values 이 나열되어 있습니다.

      Time unit 정수 값
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      참고 항목

      포함된 시간 단위의 단수 및 복수 형태는 의미상 동일합니다.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      quartz cron 값을 사용하여 refresh 예약합니다. 유효한 time_zone_values 허용됩니다. AT TIME ZONE LOCAL은 지원되지 않습니다.

      AT TIME ZONE이 없는 경우 세션 표준 시간대가 사용됩니다. AT TIME ZONE가 없고 세션 시간대가 set이 아니면 오류가 발생합니다. SCHEDULESCHEDULE REFRESH와 의미 체계가 같습니다.

    일정은 CREATE 명령의 일부로 제공할 수 있습니다. ALTER STREAMING TABLE을 사용하거나 SCHEDULE 절과 함께 CREATE OR REFRESH 명령을 실행하여 생성 후 스트리밍 table의 일정을 변경합니다.

  • ROW FILTER 절과 함께

    Important

    이 기능은 공개 미리 보기 상태입니다.

    table에 행 필터 기능을 추가합니다. 모든 후속 쿼리의 table는 함수가 부울 TRUE로 평가하는 where 행의 하위 집합을 받습니다. 이는 함수가 호출하는 사용자의 ID 또는 그룹 멤버 자격을 검사하여 특정 행을 필터링할지 여부를 결정할 수 where 세분화된 액세스 제어 목적에 유용할 수 있습니다.

  • AS query

    이 절은 query데이터를 사용하여 table 채웁니다. 이 쿼리는 스트리밍 쿼리여야 합니다. 이 작업은 증분 방식으로 처리하려는 모든 관계로 STREAM 키워드를 추가하여 수행할 수 있습니다. query table_specification 함께 지정하면 table_specification 지정된 tableschemaquery반환된 모든 columns 포함해야 합니다. 그렇지 않으면 오류를 get. table_specification에 지정되었으나 query가 반환하지 않은 columns는 쿼리할 때 nullvalues를 반환합니다.

스트리밍 tables 및 기타 tables 간의 차이점

스트리밍 tables은 각 행을 한 번만 처리할 수 있도록 설계된 상태 저장 tables로, 증가하는 데이터 세트를 다룹니다. 대부분의 데이터 세트는 시간이 지남에 따라 지속적으로 증가하기 때문에 스트리밍 tables는 대부분의 데이터 수집 워크로드에 적합합니다. 스트리밍 tables는 데이터 최신성과 짧은 대기 시간이 필요한 파이프라인에 최적입니다. 스트리밍 tables는 대규모 변환에 유용할 수 있습니다. 새 데이터가 도착할 때마다 결과를 증분 방식으로 계산하여, 각 update에서 모든 원본 데이터를 전부 재계산하지 않고도 결과를 지속적으로 최신 상태로 유지할 수 있습니다. 스트리밍 tables은 추가 전용 데이터 원본을 위해 설계되었습니다.

스트리밍 tables은 쿼리에서 제공된 원본에서 사용할 수 있는 최신 데이터를 처리하는 REFRESH같은 추가 명령을 수락합니다. 제공된 쿼리에 대한 변경 내용은 이전에 처리된 데이터가 아닌 REFRESH호출하여 새 데이터에만 반영될 get 있습니다. 기존 데이터에도 변경 내용을 적용하려면 REFRESH TABLE <table_name> FULL을 실행하여 FULL REFRESH를 수행해야 합니다. 전체 새로 고침은 원본에서 사용 가능한 모든 데이터를 최신 정의로 다시 처리합니다. 보존 기간이 짧거나 데이터의 전체 기록을 유지하지 않는 소스, 예를 들어 Kafka와 같은 경우에는 전체 새로 고침을 호출하지 않는 것이 좋습니다. 이는 전체 refresh가 기존 데이터를 삭제하기 때문입니다. 더 이상 원본에서 데이터를 사용할 수 없는 경우 이전 데이터를 복구하지 못할 수 있습니다.

행 필터 및 column 마스크

Important

이 기능은 공개 미리 보기 상태입니다.

행 필터를 사용하면 table 스캔이 행을 가져올 때마다 필터 역할을 하는 함수를 지정할 수 있습니다. 이러한 필터를 통해 후속 쿼리는 필터 조건자가 true로 평가되는 행만 반환합니다.

Column 마스크는 table 검사가 행을 가져올 때마다 column의 values를 마스크하는 기능을 제공합니다. 해당 column와 관련된 모든 향후 쿼리는 column에 대한 함수 평가 결과를 통해 column의 원래 값이 대체된 결과를 받게 됩니다.

행 필터 및 마스크를 사용하는 방법에 대한 자세한 내용은 행 필터 및 마스크사용하여 중요한 데이터 필터링 참조하세요.

행 필터 및 Column 마스크 관리

스트리밍 tables의 행 필터와 column 마스크는 CREATE OR REFRESH 문을 통해 추가하거나, 업데이트하거나, 삭제해야 합니다.

동작

  • Definer: 또는 문이 스트리밍 경우 행 필터 함수는 정의자의 권한( 소유자)과 함께 실행됩니다. 이는 tablerefresh이 스트리밍 table를 만든 사용자의 보안 컨텍스트를 사용한다는 것을 의미합니다.
  • 쿼리: 대부분의 필터는 정의자의 권한으로 실행되지만 사용자 컨텍스트(예: CURRENT_USERIS_MEMBER)를 확인하는 함수는 예외입니다. 이러한 함수는 호출자로 실행됩니다. 이 방법은 현재 사용자의 컨텍스트에 따라 사용자별 데이터 보안 및 액세스 제어를 적용합니다.

가시성

DESCRIBE EXTENDED, INFORMATION_SCHEMA또는 Catalog 탐색기를 사용하여 주어진 스트리밍 table에 적용되는 기존 행 필터 및 column 마스크를 검사합니다. 이 기능을 사용하면 사용자가 스트리밍 tables대한 데이터 액세스 및 보호 조치를 감사하고 검토할 수 있습니다.

제한 사항

  • table 소유자만 refresh 스트리밍을 tables 할 수 있으며 최신 데이터를 get 수 있습니다.
  • 스트리밍 tables에서는 ALTER TABLE 명령이 허용되지 않습니다. table 정의 및 속성은 CREATE OR REFRESH 또는 ALTER STREAMING TABLE 문을 통해 변경해야 합니다.
  • INSERT INTOMERGE 같은 DML 명령을 통해 tableschema 발전하는 것은 지원되지 않습니다.
  • 스트리밍 tables에서 다음의 명령은 지원되지 않습니다.
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing은 지원되지 않습니다.
  • table 이름 바꾸기 또는 소유자 변경은 지원되지 않습니다.
  • PRIMARY KEYFOREIGN KEY 같은 Table 제약 조건은 지원되지 않습니다.
  • 생성된 columns, ID columns및 기본 columns 지원되지 않습니다.

예제

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')