CREATE STREAMING TABLE
Dotyczy: Databricks SQL
Tworzy przesyłania strumieniowego tableoraz Deltę table z dodatkową obsługą przesyłania strumieniowego lub przyrostowego przetwarzania danych.
tables przesyłania strumieniowego są obsługiwane tylko w usłudze Delta Live Tables i w usłudze Databricks SQL z programem Unity Catalog. Uruchomienie tego polecenia w obsługiwanym środowisku Databricks Runtime oblicza tylko składnię. Zobacz Tworzenie kodu potoku przy użyciu języka SQL.
Składnia
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parameters
REFRESH
Jeśli sprecyzowano, odświeży table z najnowszymi danymi dostępnymi ze źródeł zdefiniowanych w zapytaniu. Tylko nowe dane, które docierają przed rozpoczęciem zapytania, są przetwarzane. Nowe dane dodawane do źródeł podczas wykonywania polecenia są ignorowane do następnego refresh. Operacja refresh z funkcji CREATE OR REFRESH jest w pełni deklaratywna. Jeśli polecenie refresh nie określa wszystkich metadanych z oryginalnej instrukcji tworzenia table, nieokreślone metadane zostaną usunięte.
JEŚLI NIE ISTNIEJE
Tworzy przesył strumieniowy table, jeśli nie istnieje. Jeśli table o tej nazwie już istnieje, instrukcja
CREATE STREAMING TABLE
jest ignorowana.Możesz określić co najwyżej jeden z
IF NOT EXISTS
elementów lubOR REFRESH
.-
Nazwa dla table do utworzenia. Nazwa nie może zawierać specyfikacji czasowej ani specyfikacji opcji. Jeśli nazwa nie jest kwalifikowana, table zostanie utworzona w bieżącym schema.
table_specification
Ta opcjonalna klauzula definiuje listcolumns, ich typy, właściwości, opisy oraz ograniczenia column.
Jeśli nie zdefiniujesz columns w tableschema należy określić
AS query
.-
Unikatowa nazwa dla column.
-
Określa typ danych dla column.
NOT NULL
Jeśli określono, column nie akceptuje
NULL
values.COLUMN_COMMENT KOMENTARZ
Ciąg znaków opisujący column.
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje klucz podstawowy lub klucz obcy constraint do column w przesyłaniu strumieniowym table. Ograniczenia nie są obsługiwane w przypadku tables w
hive_metastore
catalog. -
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje funkcję maski column do anonimizacji poufnych danych. Wszystkie kolejne zapytania z tej column otrzymują wynik oceny tej funkcji w column zamiast oryginalnej wartości column. Może to być przydatne w celach szczegółowej kontroli dostępu, where funkcja może sprawdzić tożsamość lub członkostwo w grupach wywoływanego użytkownika, aby zdecydować, czy zredagować wartość.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ PRZY NARUSZENIU { FAIL UPDATE | DROP ROW } ]
Dodaje oczekiwania dotyczące jakości danych do table. Te oczekiwania jakości danych można śledzić w czasie i uzyskiwać do nich dostęp za pośrednictwem dziennika zdarzeń przesyłania strumieniowego table. Oczekiwanie
FAIL UPDATE
powoduje awarię przetwarzania przy jednoczesnym tworzeniu table i odświeżaniu table. OczekiwanieDROP ROW
powoduje porzucenie całego wiersza, jeśli oczekiwanie nie zostanie spełnione.expectation_expr
mogą składać się z literałów, identyfikatorów column wewnątrz tableoraz deterministycznych, wbudowanych funkcji SQL lub operatorów, z wyjątkiem:-
Agregujących
- funkcje analityczne window
- funkcje window rankingowe
- Table cenionych funkcji generatora
Ponadto
expr
nie może zawierać żadnego podzapytania.-
Agregujących
-
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje ograniczenia informacyjnego klucza podstawowego lub informacyjnego klucza obcego do strumienia danych table. Ograniczenia klucza nie są obsługiwane w przypadku tables w
hive_metastore
catalog.
-
-
table_clauses
Opcjonalnie określ partycjonowanie, komentarze, właściwości zdefiniowane przez użytkownika i harmonogram refresh dla nowego table. Każda klauzula podrzędna może być określona tylko raz.
-
Opcjonalna list z columns do table, aby partitiontable.
TABLE_COMMENT KOMENTARZ
Literał
STRING
, aby opisać table.-
Opcjonalnie ustawia co najmniej jedną właściwość zdefiniowaną przez użytkownika.
Użyj tego ustawienia, aby określić kanał środowiska uruchomieniowego usługi Delta Live Tables używany do uruchamiania tej instrukcji. Ustaw Set wartość właściwości
pipelines.channel
na"PREVIEW"
lub"CURRENT"
. Domyślna wartość to"CURRENT"
. Aby uzyskać więcej informacji na temat kanałów usługi Delta Live Tables, zobacz kanały środowiska uruchomieniowego usługi Delta Live Tables. HARMONOGRAM [ REFRESH ] klauzula_harmonogramu
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
Aby zaplanować refresh, które występuje okresowo, użyj składni
EVERY
. Jeśli określono składnięEVERY
, widok przesyłania strumieniowego table lub zmaterializowany widok jest okresowo odświeżany w określonym interwale w oparciu o podaną wartość, taką jakHOUR
,HOURS
,DAY
,DAYS
,WEEK
lubWEEKS
. Poniższa table zawiera listę akceptowanych liczb całkowitych values dlanumber
.Time unit Wartość całkowita HOUR or HOURS
1 <= H <= 72 DAY or DAYS
1 <= D <= 31 WEEK or WEEKS
1 <= W <= 8 Uwaga
Liczba pojedyncza i mnoga dołączonej jednostki czasowej są semantycznie równoważne.
CRON cron_string [ AT TIME ZONE timezone_id ]
Aby zaplanować refresh przy użyciu wartości cron. Akceptowane są prawidłowe time_zone_values .
AT TIME ZONE LOCAL
nie jest obsługiwana.Jeśli
AT TIME ZONE
jest nieobecny, używana jest strefa czasowa sesji. JeśliAT TIME ZONE
jest nieobecny, a strefa czasowa sesji nie jest set, zostanie zgłoszony błąd.SCHEDULE
jest semantycznie równoważne .SCHEDULE REFRESH
Harmonogram można podać w ramach
CREATE
polecenia . Użyj ALTER STREAMING TABLE lub uruchom polecenieCREATE OR REFRESH
z klauzuląSCHEDULE
, aby zmienić harmonogram przesyłania strumieniowego dla table po jego utworzeniu.-
Z klauzulą ROW FILTER
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Dodaje funkcję filtrowania wierszy do table. Wszystkie kolejne zapytania z tego table uzyskują podzbiór wierszy where, które funkcja ocenia jako wartość logiczną TRUE. Może to być przydatne w celach szczegółowej kontroli dostępu, where funkcja może sprawdzić tożsamość lub członkostwo w grupach wywoływanego użytkownika, aby zdecydować, czy filtrować niektóre wiersze.
-
Ta klauzula wypełnia table przy użyciu danych z
query
. To zapytanie musi być zapytaniem przesyłanym strumieniowo. Można to osiągnąć, dodającSTREAM
słowo kluczowe do dowolnej relacji, którą chcesz przetwarzać przyrostowo. Po określeniuquery
itable_specification
razem tableschema określony wtable_specification
musi zawierać wszystkie columns zwrócone przezquery
, w przeciwnym razie get błąd. Wszystkie columns określone wtable_specification
, ale niezwracane przezquery
, zwracająnull
values podczas wykonywania zapytania.
Różnice między przesyłaniem strumieniowym tables a innymi tables
Przesyłanie strumieniowe tables jest stanowe tables, zaprojektowane do jednokrotnej obsługi każdej pozycji w miarę przetwarzania rosnącego zestawu danych. Ponieważ większość zestawów danych stale rośnie wraz z upływem czasu, przesyłanie strumieniowe tables jest dobre dla większości obciążeń pozyskiwania. Strumieniowanie tables jest optymalne dla potoków wymagających świeżości danych i niskich opóźnień. Przesyłanie strumieniowe tables może być również przydatne w przypadku transformacji na dużą skalę, ponieważ wyniki mogą być obliczane przyrostowo po nadejściu nowych danych, zapewniając aktualizację wyników bez konieczności pełnego ponownego przeliczania wszystkich danych źródłowych z każdym update. Przesyłanie strumieniowe tables jest przeznaczone dla źródeł danych, które są tylko do odczytu i dodawania.
Usługa przesyłania strumieniowego tables akceptuje dodatkowe polecenia, takie jak REFRESH
, które przetwarzają najnowsze dane dostępne w źródłach podanych w zapytaniu. Zmiany w podanym zapytaniu get są odzwierciedlane tylko w nowych danych przez wywołanie REFRESH
, których nie przetwarzano wcześniej. Aby zastosować zmiany w istniejących danych, należy wykonać REFRESH TABLE <table_name> FULL
polecenie , aby wykonać polecenie FULL REFRESH
. Pełne odświeżenia ponownie przetwarzają wszystkie dane dostępne w źródle przy użyciu najnowszej definicji. Nie zaleca się wywoływania pełnych odświeżeń w źródłach, które nie przechowują całej historii danych lub mają krótkie okresy przechowywania, takie jak Kafka, ponieważ pełne refresh obcina istniejące dane. Odzyskanie starych danych może nie być możliwe, jeśli dane nie są już dostępne w źródle.
Filtry wierszy i maski column
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Filtry wierszy umożliwiają określenie funkcji, która ma zastosowanie jako filtr za każdym razem, gdy skanowanie table pobiera wiersze. Te filtry zapewniają, że kolejne zapytania zwracają tylko wiersze, dla których predykat filtru daje wartość true.
Maski Column pozwalają zamaskować values w columnza każdym razem, gdy skan table pobiera wiersze. Wszystkie przyszłe zapytania obejmujące tę column otrzymają wynik z oceny funkcji w column, zastępując oryginalną wartość columnjego nową wartością.
Aby uzyskać więcej informacji na temat używania filtrów wierszy i maski column, zobacz Filtruj poufne dane table przy użyciu filtrów wierszy i maski column.
Zarządzanie filtrami wierszy i maskami Column
Filtry wierszy i maski column w tables przesyłania strumieniowego powinny zostać dodane, zaktualizowane lub usunięte za pomocą instrukcji CREATE OR REFRESH
.
Zachowanie
-
Refresh jako definer: Kiedy instrukcje
CREATE OR REFRESH
lubREFRESH
refresh strumień table, funkcje filtrowania wierszy działają z uprawnieniami definiującego (jako właściciela table). Oznacza to, że tablerefresh używa kontekstu zabezpieczeń użytkownika, który utworzył przesyłanie strumieniowe table. -
Zapytanie: Podczas gdy większość filtrów jest uruchamiana z prawami definiowanego, funkcje sprawdzające kontekst użytkownika (takie jak
CURRENT_USER
iIS_MEMBER
) są wyjątkami. Te funkcje działają jako wywołanie. Takie podejście wymusza zabezpieczenia danych specyficzne dla użytkownika i mechanizmy kontroli dostępu na podstawie kontekstu bieżącego użytkownika.
Wgląd w informacje
Użyj DESCRIBE EXTENDED
, INFORMATION_SCHEMA
lub Eksploratora Catalog, aby zbadać istniejące filtry wierszy i maski column, które mają zastosowanie do danego przesyłania strumieniowego table. Ta funkcja umożliwia użytkownikom inspekcję i przegląd dostępu i ochrony danych w przesyłaniu strumieniowym tables.
Ograniczenia
- Tylko właściciele table mogą refresh strumieniować tables do get najnowszych danych.
- polecenia
ALTER TABLE
są niedozwolone w przesyłaniu strumieniowym tables. Definicja i właściwości table powinny zostać zmienione za pomocą instrukcjiCREATE OR REFRESH
lub ALTER STREAMING TABLE. - Rozwijanie tableschema za pomocą poleceń DML, takich jak
INSERT INTO
, iMERGE
nie jest obsługiwane. - Następujące polecenia nie są obsługiwane w przesyłaniu strumieniowym tables:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- Udostępnianie różnicowe nie jest obsługiwane.
- Zmiana nazwy table lub zmiana właściciela nie jest obsługiwana.
-
Table ograniczenia, takie jak
PRIMARY KEY
iFOREIGN KEY
, nie są obsługiwane. - Wygenerowane columns, tożsamość columnsi domyślne columns nie są obsługiwane.
Przykłady
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')