Udostępnij za pośrednictwem


CREATE STREAMING TABLE

Dotyczy:zaznacz pole wyboru oznaczone jako tak Databricks SQL

Tworzy przesyłania strumieniowego tableoraz Deltę table z dodatkową obsługą przesyłania strumieniowego lub przyrostowego przetwarzania danych.

tables przesyłania strumieniowego są obsługiwane tylko w usłudze Delta Live Tables i w usłudze Databricks SQL z programem Unity Catalog. Uruchomienie tego polecenia w obsługiwanym środowisku Databricks Runtime oblicza tylko składnię. Zobacz Tworzenie kodu potoku przy użyciu języka SQL.

Składnia

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] schedule_clause |
    WITH { ROW FILTER clause } } [...]

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ] }

Parameters

  • REFRESH

    Jeśli sprecyzowano, odświeży table z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu. Tylko nowe dane, które docierają przed rozpoczęciem zapytania, są przetwarzane. Nowe dane dodawane do źródeł podczas wykonywania polecenia są ignorowane do następnego refresh. Operacja refresh z funkcji CREATE OR REFRESH jest w pełni deklaratywna. Jeśli polecenie refresh nie określa wszystkich metadanych z oryginalnej instrukcji tworzenia table, nieokreślone metadane zostaną usunięte.

  • JEŚLI NIE ISTNIEJE

    Tworzy przesył strumieniowy table, jeśli nie istnieje. Jeśli table o tej nazwie już istnieje, instrukcja CREATE STREAMING TABLE jest ignorowana.

    Możesz określić co najwyżej jeden z IF NOT EXISTS elementów lub OR REFRESH.

  • table_name

    Nazwa dla table do utworzenia. Nazwa nie może zawierać specyfikacji czasowej ani specyfikacji opcji. Jeśli nazwa nie jest kwalifikowana, table zostanie utworzona w bieżącym schema.

  • table_specification

    Ta opcjonalna klauzula definiuje listcolumns, ich typy, właściwości, opisy oraz ograniczenia column.

    Jeśli nie zdefiniujesz columns w tableschema należy określić AS query.

    • column_identifier

      Unikatowa nazwa dla column.

      • column_type

        Określa typ danych dla column.

      • NOT NULL

        Jeśli określono, column nie akceptuje NULLvalues.

      • COLUMN_COMMENT KOMENTARZ

        Ciąg znaków opisujący column.

      • column_constraint

        Ważne

        Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

        Dodaje klucz podstawowy lub klucz obcy constraint do column w przesyłaniu strumieniowym table. Ograniczenia nie są obsługiwane w przypadku tables w hive_metastorecatalog.

      • KLAUZULA MASK

        Ważne

        Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

        Dodaje funkcję maski column do anonimizacji poufnych danych. Wszystkie kolejne zapytania z tej column otrzymują wynik oceny tej funkcji w column zamiast oryginalnej wartości column. Może to być przydatne w celach szczegółowej kontroli dostępu, where funkcja może sprawdzić tożsamość lub członkostwo w grupach wywoływanego użytkownika, aby zdecydować, czy zredagować wartość.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ PRZY NARUSZENIU { FAIL UPDATE | DROP ROW } ]

        Dodaje oczekiwania dotyczące jakości danych do table. Te oczekiwania jakości danych można śledzić w czasie i uzyskiwać do nich dostęp za pośrednictwem dziennika zdarzeń przesyłania strumieniowego table. Oczekiwanie FAIL UPDATE powoduje awarię przetwarzania przy jednoczesnym tworzeniu table i odświeżaniu table. Oczekiwanie DROP ROW powoduje porzucenie całego wiersza, jeśli oczekiwanie nie zostanie spełnione.

        expectation_expr mogą składać się z literałów, identyfikatorów column wewnątrz tableoraz deterministycznych, wbudowanych funkcji SQL lub operatorów, z wyjątkiem:

        Ponadto expr nie może zawierać żadnego podzapytania.

      • table_constraint

        Ważne

        Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

        Dodaje ograniczenia informacyjnego klucza podstawowego lub informacyjnego klucza obcego do strumienia danych table. Ograniczenia klucza nie są obsługiwane w przypadku tables w hive_metastorecatalog.

  • table_clauses

    Opcjonalnie określ partycjonowanie, komentarze, właściwości zdefiniowane przez użytkownika i harmonogram refresh dla nowego table. Każda klauzula podrzędna może być określona tylko raz.

    • PARTYCJONOWANE PRZEZ

      Opcjonalna list z columns do table, aby partitiontable.

    • TABLE_COMMENT KOMENTARZ

      Literał STRING, aby opisać table.

    • TBLPROPERTIES

      Opcjonalnie ustawia co najmniej jedną właściwość zdefiniowaną przez użytkownika.

      Użyj tego ustawienia, aby określić kanał środowiska uruchomieniowego usługi Delta Live Tables używany do uruchamiania tej instrukcji. Ustaw Set wartość właściwości pipelines.channel na "PREVIEW" lub "CURRENT". Domyślna wartość to "CURRENT". Aby uzyskać więcej informacji na temat kanałów usługi Delta Live Tables, zobacz kanały środowiska uruchomieniowego usługi Delta Live Tables.

    • HARMONOGRAM [ REFRESH ] klauzula_harmonogramu

    • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

      Aby zaplanować refresh, które występuje okresowo, użyj składni EVERY. Jeśli określono składnię EVERY, widok przesyłania strumieniowego table lub zmaterializowany widok jest okresowo odświeżany w określonym interwale w oparciu o podaną wartość, taką jak HOUR, HOURS, DAY, DAYS, WEEKlub WEEKS. Poniższa table zawiera listę akceptowanych liczb całkowitych values dla number.

      Time unit Wartość całkowita
      HOUR or HOURS 1 <= H <= 72
      DAY or DAYS 1 <= D <= 31
      WEEK or WEEKS 1 <= W <= 8

      Uwaga

      Liczba pojedyncza i mnoga dołączonej jednostki czasowej są semantycznie równoważne.

    • CRON cron_string [ AT TIME ZONE timezone_id ]

      Aby zaplanować refresh przy użyciu wartości cron. Akceptowane są prawidłowe time_zone_values . AT TIME ZONE LOCAL nie jest obsługiwana.

      Jeśli AT TIME ZONE jest nieobecny, używana jest strefa czasowa sesji. Jeśli AT TIME ZONE jest nieobecny, a strefa czasowa sesji nie jest set, zostanie zgłoszony błąd. SCHEDULEjest semantycznie równoważne .SCHEDULE REFRESH

    Harmonogram można podać w ramach CREATE polecenia . Użyj ALTER STREAMING TABLE lub uruchom polecenie CREATE OR REFRESH z klauzulą SCHEDULE, aby zmienić harmonogram przesyłania strumieniowego dla table po jego utworzeniu.

  • Z klauzulą ROW FILTER

    Ważne

    Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

    Dodaje funkcję filtrowania wierszy do table. Wszystkie kolejne zapytania z tego table uzyskują podzbiór wierszy where, które funkcja ocenia jako wartość logiczną TRUE. Może to być przydatne w celach szczegółowej kontroli dostępu, where funkcja może sprawdzić tożsamość lub członkostwo w grupach wywoływanego użytkownika, aby zdecydować, czy filtrować niektóre wiersze.

  • Zapytanie AS

    Ta klauzula wypełnia table przy użyciu danych z query. To zapytanie musi być zapytaniem przesyłanym strumieniowo. Można to osiągnąć, dodając STREAM słowo kluczowe do dowolnej relacji, którą chcesz przetwarzać przyrostowo. Po określeniu query i table_specification razem tableschema określony w table_specification musi zawierać wszystkie columns zwrócone przez query, w przeciwnym razie get błąd. Wszystkie columns określone w table_specification, ale niezwracane przez query, zwracają nullvalues podczas wykonywania zapytania.

Różnice między przesyłaniem strumieniowym tables a innymi tables

Przesyłanie strumieniowe tables jest stanowe tables, zaprojektowane do jednokrotnej obsługi każdej pozycji w miarę przetwarzania rosnącego zestawu danych. Ponieważ większość zestawów danych stale rośnie wraz z upływem czasu, przesyłanie strumieniowe tables jest dobre dla większości obciążeń pozyskiwania. Strumieniowanie tables jest optymalne dla potoków wymagających świeżości danych i niskich opóźnień. Przesyłanie strumieniowe tables może być również przydatne w przypadku transformacji na dużą skalę, ponieważ wyniki mogą być obliczane przyrostowo po nadejściu nowych danych, zapewniając aktualizację wyników bez konieczności pełnego ponownego przeliczania wszystkich danych źródłowych z każdym update. Przesyłanie strumieniowe tables jest przeznaczone dla źródeł danych, które są tylko do odczytu i dodawania.

Usługa przesyłania strumieniowego tables akceptuje dodatkowe polecenia, takie jak REFRESH, które przetwarzają najnowsze dane dostępne w źródłach podanych w zapytaniu. Zmiany w podanym zapytaniu get są odzwierciedlane tylko w nowych danych przez wywołanie REFRESH, których nie przetwarzano wcześniej. Aby zastosować zmiany w istniejących danych, należy wykonać REFRESH TABLE <table_name> FULL polecenie , aby wykonać polecenie FULL REFRESH. Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne refresh obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.

Filtry wierszy i maski column

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Filtry wierszy umożliwiają określenie funkcji, która ma zastosowanie jako filtr za każdym razem, gdy skanowanie table pobiera wiersze. Te filtry zapewniają, że kolejne zapytania zwracają tylko wiersze, dla których predykat filtru daje wartość true.

Maski Column pozwalają zamaskować values w columnza każdym razem, gdy skan table pobiera wiersze. Wszystkie przyszłe zapytania obejmujące tę column otrzymają wynik z oceny funkcji w column, zastępując oryginalną wartość columnjego nową wartością.

Aby uzyskać więcej informacji na temat używania filtrów wierszy i maski column, zobacz Filtruj poufne dane table przy użyciu filtrów wierszy i maski column.

Zarządzanie filtrami wierszy i maskami Column

Filtry wierszy i maski column w tables przesyłania strumieniowego powinny zostać dodane, zaktualizowane lub usunięte za pomocą instrukcji CREATE OR REFRESH.

Zachowanie

  • Refresh jako definer: Kiedy instrukcje CREATE OR REFRESH lub REFRESHrefresh strumień table, funkcje filtrowania wierszy działają z uprawnieniami definiującego (jako właściciela table). Oznacza to, że tablerefresh używa kontekstu zabezpieczeń użytkownika, który utworzył przesyłanie strumieniowe table.
  • Zapytanie: Podczas gdy większość filtrów jest uruchamiana z prawami definiowanego, funkcje sprawdzające kontekst użytkownika (takie jak CURRENT_USER i IS_MEMBER) są wyjątkami. Te funkcje działają jako wywołanie. Takie podejście wymusza zabezpieczenia danych specyficzne dla użytkownika i mechanizmy kontroli dostępu na podstawie kontekstu bieżącego użytkownika.

Wgląd w informacje

Użyj DESCRIBE EXTENDED, INFORMATION_SCHEMAlub Eksploratora Catalog, aby zbadać istniejące filtry wierszy i maski column, które mają zastosowanie do danego przesyłania strumieniowego table. Ta funkcja umożliwia użytkownikom inspekcję i przegląd dostępu i ochrony danych w przesyłaniu strumieniowym tables.

Ograniczenia

  • Tylko właściciele table mogą refresh strumieniować tables do get najnowszych danych.
  • polecenia ALTER TABLE są niedozwolone w przesyłaniu strumieniowym tables. Definicja i właściwości table powinny zostać zmienione za pomocą instrukcji CREATE OR REFRESH lub ALTER STREAMING TABLE.
  • Rozwijanie tableschema za pomocą poleceń DML, takich jak INSERT INTO, i MERGE nie jest obsługiwane.
  • Następujące polecenia nie są obsługiwane w przesyłaniu strumieniowym tables:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Udostępnianie różnicowe nie jest obsługiwane.
  • Zmiana nazwy table lub zmiana właściciela nie jest obsługiwana.
  • Table ograniczenia, takie jak PRIMARY KEY i FOREIGN KEY, nie są obsługiwane.
  • Wygenerowane columns, tożsamość columnsi domyślne columns nie są obsługiwane.

Przykłady

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM RANGE(10)

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')