Dela via


Sql-språkreferens för Delta Live Tables

Den här artikeln innehåller information om SQL-programmeringsgränssnittet för Delta Live Tables.

  • Information om Python-API:et finns i python-språkreferensen Delta Live Tables.
  • Mer information om SQL-kommandon finns i SQL-språkreferens.

Du kan använda användardefinierade Python-funktioner (UDF: er) i dina SQL-frågor, men du måste definiera dessa UDF:er i Python-filer innan du anropar dem i SQL-källfiler. Se Användardefinierade skalärfunktioner – Python.

Begränsningar

Satsen PIVOT stöds inte. Åtgärden pivot i Spark kräver ivrig inläsning av indata för att beräkna utdataschemat. Den här funktionen stöds inte i Delta Live Tables.

Skapa en materialiserad vy eller en strömmande tabell för Delta Live Tables

Kommentar

  • Syntaxen CREATE OR REFRESH LIVE TABLE för att skapa en materialiserad vy är inaktuell. Använd CREATE OR REFRESH MATERIALIZED VIEWi stället .
  • Om du vill använda CLUSTER BY -satsen för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.

Du använder samma grundläggande SQL-syntax när du deklarerar en strömmande tabell eller en materialiserad vy.

Deklarera en materialiserad Delta Live Tables-vy med SQL

Följande beskriver syntaxen för att deklarera en materialiserad vy i Delta Live Tables med SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Deklarera en Delta Live Tables-strömningstabell med SQL

Du kan bara deklarera strömmande tabeller med hjälp av frågor som läse mot en strömmande källa. Databricks rekommenderar att du använder Auto Loader för strömmande inmatning av filer från molnobjektlagring. Se SQL-syntax för automatisk inläsning.

När du anger andra tabeller eller vyer i din pipeline som strömmande källor måste du inkludera STREAM() funktionen runt ett datauppsättningsnamn.

Följande beskriver syntaxen för att deklarera en strömningstabell i Delta Live Tables med SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Skapa en Delta Live Tables-vy

Följande beskriver syntaxen för att deklarera vyer med SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL-syntax för automatisk inläsning

Följande beskriver syntaxen för att arbeta med automatisk inläsning i SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Du kan använda formatalternativ som stöds med Auto Loader. Med hjälp av map() funktionen kan du skicka alternativ till read_files() metoden. Alternativen är nyckel/värde-par, där nycklar och värden är strängar. Mer information om stödformat och alternativ finns i Alternativ för filformat.

Exempel: Definiera tabeller

Du kan skapa en datauppsättning genom att läsa från en extern datakälla eller från datauppsättningar som definierats i en pipeline. Om du vill läsa från en intern datauppsättning förbereder du nyckelordet LIVE till datamängdens namn. I följande exempel definieras två olika datauppsättningar: en tabell med namnet taxi_raw som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data som tar taxi_raw tabellen som indata:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exempel: Läsa från en strömmande källa

Om du vill läsa data från en strömmande källa, till exempel Automatisk inläsare eller en intern datauppsättning, definierar du en STREAMING tabell:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Mer information om strömmande data finns i Transformera data med Delta Live Tables.

Kontrollera hur tabeller materialiseras

Tabeller ger också ytterligare kontroll över materialiseringen:

Kommentar

För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att Delta Live Tables kan styra dataorganisationen. Om du inte förväntar dig att tabellen ska växa utöver en terabyte rekommenderar Databricks att du inte anger partitionskolumner.

Exempel: Ange ett schema och partitionskolumner

Du kan också ange ett schema när du definierar en tabell. I följande exempel anges schemat för måltabellen, inklusive att använda Delta Lake-genererade kolumner och definiera partitionskolumner för tabellen:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Som standard härleder Delta Live Tables schemat från table definitionen om du inte anger något schema.

Exempel: Definiera tabellbegränsningar

Kommentar

Delta Live Tables-stöd för tabellbegränsningar finns i offentlig förhandsversion. För att definiera tabellbegränsningar måste din pipeline vara en Unity Catalog-aktiverad pipeline och konfigurerad för att använda preview kanalen.

När du anger ett schema kan du definiera primära och externa nycklar. Begränsningarna är informationsmässiga och tillämpas inte. Se BEGRÄNSNINGssatsen i SQL-språkreferensen.

I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parameterisera värden som används vid deklarering av tabeller eller vyer med SQL

Använd SET för att ange ett konfigurationsvärde i en fråga som deklarerar en tabell eller vy, inklusive Spark-konfigurationer. Alla tabeller eller vyer som du definierar i en notebook-fil efter att instruktionen SET har åtkomst till det definierade värdet. Alla Spark-konfigurationer som anges med instruktionen SET används när du kör Spark-frågan för en tabell eller vy som följer SET-instruktionen. Om du vill läsa ett konfigurationsvärde i en fråga använder du syntaxen för ${}stränginterpolation . I följande exempel anges ett Spark-konfigurationsvärde med namnet startDate och värdet används i en fråga:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Om du vill ange flera konfigurationsvärden använder du en separat SET instruktion för varje värde.

Exempel: Definiera ett radfilter och en kolumnmask

Viktigt!

Radfilter och kolumnmasker finns i offentlig förhandsversion.

Om du vill skapa en materialiserad vy eller en strömningstabell med ett radfilter och en kolumnmask använder du ROW FILTER-satsen och MASK-satsen. I följande exempel visas hur du definierar en materialiserad vy och en strömningstabell med både ett radfilter och en kolumnmask:

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Mer information om radfilter och kolumnmasker finns i Publicera tabeller med radfilter och kolumnmasker.

SQL-egenskaper

Kommentar

Om du vill använda CLUSTER BY -satsen för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.

SKAPA TABELL ELLER VY
TEMPORARY

Skapa en tabell men publicera inte metadata för tabellen. TEMPORARY Satsen instruerar Delta Live Tables att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en tillfällig tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.
STREAMING

Skapa en tabell som läser en indatauppsättning som en dataström. Indatauppsättningen måste vara en strömmande datakälla, till exempel automatisk inläsning eller en STREAMING tabell.
CLUSTER BY

Aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar.

Se Använda flytande klustring för Delta-tabeller.
PARTITIONED BY

En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen.
LOCATION

En valfri lagringsplats för tabelldata. Om det inte anges kommer systemet som standard att vara platsen för pipelinelagringen.
COMMENT

En valfri beskrivning för tabellen.
column_constraint

En valfri informations primärnyckel eller sekundärnyckelbegränsning för kolumnen.
MASK clause (Offentlig förhandsversion)

Lägger till en kolumnmaskfunktion för att anonymisera känsliga data. Framtida frågor för kolumnen returnerar den utvärderade funktionens resultat i stället för kolumnens ursprungliga värde. Detta är användbart för detaljerad åtkomstkontroll, eftersom funktionen kan kontrollera användarens identitets- och gruppmedlemskap för att avgöra om värdet ska redigeras.

Se Kolumnmasksats.
table_constraint

En valfri informations primärnyckel eller sekundärnyckelbegränsning i tabellen.
TBLPROPERTIES

En valfri lista över tabellegenskaper för tabellen.
WITH ROW FILTER clause (Offentlig förhandsversion)

Lägger till en radfilterfunktion i tabellen. Framtida frågor för tabellen tar emot en delmängd av de rader som funktionen utvärderas till TRUE för. Detta är användbart för detaljerad åtkomstkontroll eftersom den gör att funktionen kan kontrollera identitets- och gruppmedlemskapen för den anropande användaren för att avgöra om vissa rader ska filtreras.

Se ROW FILTER-sats.
select_statement

En Delta Live Tables-fråga som definierar datauppsättningen för tabellen.
CONSTRAINT-sats
EXPECT expectation_name

Definiera villkor för expectation_namedatakvalitet . Om villkoret ON VIOLATION inte har definierats lägger du till rader som bryter mot villkoret i måldatauppsättningen.
ON VIOLATION

Valfri åtgärd att vidta för misslyckade rader:

- FAIL UPDATE: Stoppa omedelbart pipelinekörningen.
- DROP ROW: Släpp posten och fortsätt bearbetningen.

Ändra datainsamling med SQL i Delta Live Tables

Använd -instruktionen APPLY CHANGES INTO för att använda DELTA Live Tables CDC-funktioner enligt beskrivningen i följande:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Du definierar datakvalitetsbegränsningar för ett APPLY CHANGES mål med samma CONSTRAINT sats som icke-frågorAPPLY CHANGES . Se Hantera datakvalitet med Delta Live Tables.

Kommentar

Standardbeteendet för INSERT och UPDATE händelser är att uppdatera CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen APPLY CHANGES måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by .

Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.

Satser
KEYS

Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

Om du vill definiera en kombination av kolumner använder du en kommaavgränsad lista med kolumner.

Den här satsen krävs.
IGNORE NULL UPDATES

Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och IGNORE NULL UPDATES har angetts behåller kolumner med en null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null.

Den här satsen är valfri.

Standardvärdet är att skriva över befintliga kolumner med null värden.
APPLY AS DELETE WHEN

Anger när en CDC-händelse ska behandlas som en DELETE i stället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSecondstabellegenskap.

Den här satsen är valfri.
APPLY AS TRUNCATE WHEN

Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

APPLY AS TRUNCATE WHEN Satsen stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkeringsåtgärden.

Den här satsen är valfri.
SEQUENCE BY

Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Delta Live Tables använder den här sekvenseringen för att hantera ändringshändelser som kommer i fel ordning.

Den angivna kolumnen måste vara en sorterbar datatyp.

Den här satsen krävs.
COLUMNS

Anger en delmängd av kolumner som ska inkluderas i måltabellen. Du kan antingen:

– Ange den fullständiga listan med kolumner som ska inkluderas: COLUMNS (userId, name, city).
– Ange en lista med kolumner som ska undantas: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen COLUMNS när satsen inte har angetts.
STORED AS

Om poster ska lagras som SCD-typ 1 eller SCD typ 2.

Den här satsen är valfri.

Standardvärdet är SCD typ 1.
TRACK HISTORY ON

Anger en delmängd av utdatakolumner för att generera historikposter när det finns ändringar i de angivna kolumnerna. Du kan antingen:

– Ange den fullständiga listan med kolumner som ska spåras: COLUMNS (userId, name, city).
– Ange en lista över kolumner som ska undantas från spårning: COLUMNS * EXCEPT (operation, sequenceNum)

Den här satsen är valfri. Standardvärdet är att spåra historiken för alla utdatakolumner när det finns ändringar, motsvarande TRACK HISTORY ON *.