Condividi tramite


Informazioni di riferimento sul linguaggio SQL di Delta Live Tables

Questo articolo include informazioni dettagliate sull'interfaccia di programmazione SQL di Delta Live Tables.

È possibile usare funzioni definite dall'utente Python nelle query SQL, ma è necessario definire queste funzioni definite dall'utente nei file Python prima di chiamarle nei file di origine SQL. Vedere Funzioni scalari definite dall'utente - Python.

Limiti

La clausola PIVOT non è supportata. L'operazione pivot in Spark richiede il caricamento anticipato dei dati di input per calcolare l'output schema. Questa funzionalità non è supportata in Delta Live Tables.

Creare una vista Tables delta live materializzata o table di streaming

Nota

  • La sintassi CREATE OR REFRESH LIVE TABLE per creare una vista materializzata è deprecata. Usare invece CREATE OR REFRESH MATERIALIZED VIEW.
  • Per usare la clausola CLUSTER BY per abilitare il clustering liquido, la pipeline deve essere configurata per l'uso del canale di anteprima.

Quando si dichiara un flusso table o una vista materializzata, si utilizza la stessa sintassi SQL di base.

Dichiarare una vista Delta Live Tables materializzata con SQL

Di seguito viene descritta la sintassi per dichiarare una vista materializzata in Delta Live Tables con SQL:

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Dichiarare uno streaming Delta Live Tables con SQL table

È possibile dichiarare lo streaming tables solo utilizzando query che leggono da un'origine di streaming. Databricks consiglia di usare il caricatore automatico per l'inserimento in streaming di file dall'archiviazione di oggetti cloud. Vedere sintassi SQL del caricatore automatico.

Quando si specificano altri tables o views nella pipeline come origini di streaming, è necessario includere la funzione STREAM() intorno al nome del set di dati.

Di seguito viene descritta la sintassi per dichiarare un elemento di streaming table in Delta Live Tables utilizzando SQL:

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Creare una visualizzazione Delta Live Tables

Di seguito viene descritta la sintassi per dichiarare views con SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintassi SQL del caricatore automatico

Di seguito viene descritta la sintassi per l'uso del caricatore automatico in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

È possibile usare le opzioni di formato supportate con Il caricatore automatico. Usando la map() funzione , è possibile passare le opzioni al read_files() metodo . Le opzioni sono sotto forma di coppie chiave-valore, where le chiavi e values sono stringhe. Per informazioni dettagliate sui formati e le opzioni di supporto, vedere Opzioni di formato file.

Esempio: Definire tables

È possibile creare un set di dati leggendo da un'origine dati esterna o da set di dati definiti in una pipeline. Per leggere da un set di dati interno, anteporre la parola chiave LIVE al nome del set di dati. L'esempio seguente definisce due set di dati diversi: un table denominato taxi_raw che accetta un file JSON come origine di input e un table denominato filtered_data che accetta il taxi_rawtable come input:

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Esempio: Leggere da un'origine di streaming

Per leggere i dati da un'origine di streaming, ad esempio Auto Loader o un set di dati interno, definire un STREAMINGtable:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Per ulteriori informazioni sullo streaming dei dati, vedere Trasformazione dei dati con Delta Live Tables.

Controllare il modo in cui tables vengono materializzati

Tables offrono anche un controllo aggiuntivo sulla loro materializzazione:

  • Specificare il modo in cui tables sono partizionati usando PARTITIONED BY. È possibile usare il partizionamento per velocizzare le query.
  • È possibile settable proprietà usando TBLPROPERTIES. Vedi le proprietà Delta Live Tablestable.
  • Set una posizione di archiviazione usando l'impostazione LOCATION. Per impostazione predefinita, i dati table vengono archiviati nell'ubicazione di archiviazione della pipeline se LOCATION non è set.
  • È possibile usare columns generate nella definizione di schema. Vedere esempio: Specificare un schema e partitioncolumns.

Nota

Per dimensioni di tables inferiori a 1 TB, Databricks consiglia di lasciare che Delta Live Tables gestisca l'organizzazione dei dati. A meno che non ti aspetti che il tuo table cresca oltre un terabyte, Databricks consiglia di non specificare partitioncolumns.

Esempio : specificare un schema e un partitionecolumns

Facoltativamente, è possibile specificare un schema quando si definisce un table. Nel seguente esempio viene specificato il schema per il tabledi destinazione, compreso l'uso di Delta Lake generato columns e la definizione di partitioncolumns per l'table:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Per impostazione predefinita, Delta Live Tables deduce il schema dalla definizione di table se non si specifica un schema.

Esempio di : Definire vincoli di table

Nota

Il supporto Delta Live Tables per i vincoli di table è in anteprima pubblica . Per definire i vincoli table, la pipeline deve essere una pipeline Catalogabilitata per Unity e configurata per l'uso del canale preview.

Quando si specifica un schema, è possibile definire chiavi primarie ed esterne. I vincoli sono informativi e non vengono applicati. Consulta la clausola CONSTRAINT nel riferimento al linguaggio SQL.

L'esempio seguente definisce un table con una chiave primaria ed esterna constraint:

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Parametrizzare values usato per dichiarare tables o views con SQL

Usare SET per specificare un valore di configurazione in una query che dichiara un table o una vista, incluse le configurazioni Spark. Qualsiasi table o vista che definisci in un notebook dopo l'istruzione SET ha accesso al valore definito. Tutte le configurazioni Spark specificate usando l'istruzione SET vengono usate durante l'esecuzione della query Spark per qualsiasi table o vista successivamente all'istruzione SET. Per leggere un valore di configurazione in una query, usare la sintassi di interpolazione di stringhe ${}. L'esempio seguente imposta un valore di configurazione Spark denominato startDate e usa tale valore in una query:

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Per specificare più configurazioni values, utilizzare un'istruzione separata SET per ogni valore.

Esempio: definire un filtro di riga e una maschera column

Importante

I filtri di riga e le maschere di column si trovano in anteprima pubblica.

Per creare una vista materializzata o uno streaming table con un filtro di riga e una maschera column, utilizzare le clausole ROW FILTER e MASK. Nell'esempio seguente viene illustrato come definire una vista materializzata e uno Streaming table con un filtro di riga e una maschera column.

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Per altre informazioni sui filtri di riga e sulle maschere di column, vedere Pubblicare tables con filtri di riga e maschere column.

Proprietà SQL

Nota

Per usare la clausola CLUSTER BY per abilitare il clustering liquido, la pipeline deve essere configurata per l'uso del canale di anteprima.

CREATE TABLE o VISUALIZZA
TEMPORARY

Creare un table ma non pubblicare i metadati per il table. La clausola TEMPORARY indica a Delta Live Tables di creare un table disponibile per la pipeline, ma non deve essere accessibile all'esterno della pipeline. Per ridurre il tempo di elaborazione, un table temporaneo persiste per tutto il periodo in cui esiste la pipeline che lo crea, e non soltanto per un singolo update.
STREAMING

Creare un table che legge un set di dati di input come flusso. Il set di dati di input deve essere un'origine dati di streaming, ad esempio Auto Loader o un STREAMINGtable.
CLUSTER BY

Abilitare il clustering liquido nella table e definire columns da usare come chiave di clustering.

Vedi Usa il clustering liquido per Delta tables.
PARTITIONED BY

Un list facoltativo di uno o più columns da utilizzare per partizionare il table.
LOCATION

Posizione di archiviazione facoltativa per i dati di table. Se non set, il sistema utilizzerà di default il percorso di archiviazione della pipeline.
COMMENT

Descrizione facoltativa per il table.
column_constraint

Una chiave primaria informativa facoltativa o una chiave esterna constraint nel column.
MASK clause (Anteprima pubblica)

Aggiunge una funzione maschera column per rendere anonimi i dati sensibili. Le query future per tale column restituiscono il risultato della funzione valutata anziché il valore originale del column. Ciò è utile per il controllo di accesso con granularità fine, perché la funzione può controllare l'identità dell'utente e le appartenenze ai gruppi per decidere se redigere il valore.

Vedere la clausola maschera Column.
table_constraint

Una chiave primaria informativa facoltativa o una chiave esterna constraint nel table.
TBLPROPERTIES

Un list facoltativo delle proprietà di table per l'table.
WITH ROW FILTER clause (Anteprima pubblica)

Aggiunge una funzione di filtro di riga al table. Le future query per table ricevono un sottoinsieme delle righe per cui la funzione restituisce TRUE. Ciò è utile per il controllo di accesso con granularità fine, perché consente alla funzione di controllare l'identità e le appartenenze ai gruppi dell'utente che richiama per decidere se filtrare determinate righe.

Vedere la clausola ROW FILTER.
select_statement

Una query Delta Live Tables che definisce il set di dati per table.
clausola CONSTRAINT
EXPECT expectation_name

Definire la qualità dei dati constraintexpectation_name. Se il ON VIOLATIONconstraint non è definito, aggiungere righe che violano il constraint al set di dati di destinazione.
ON VIOLATION

Azione facoltativa da eseguire per le righe non riuscite:

- FAIL UPDATE: arresta immediatamente l'esecuzione della pipeline.
- DROP ROW: eliminare il record e continuare l'elaborazione.

Change Data Capture con SQL in Delta Live Tables

Usare l'istruzione APPLY CHANGES INTO per usare la funzionalità Delta Live Tables CDC, come descritto di seguito:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

I vincoli di qualità dei dati per una destinazione APPLY CHANGES vengono definiti usando la stessa clausola CONSTRAINT delle query non APPLY CHANGES. Consulta Gestire la qualità dei dati con Delta Live Tables.

Nota

Il comportamento predefinito per gli eventi INSERT e UPDATE consiste nel upsert eventi CDC dall'origine: update tutte le righe nel table di destinazione che corrispondono alle chiavi specificate o insert una nuova riga quando un record corrispondente non esiste nel tabledi destinazione. La gestione degli eventi DELETE può essere specificata con la condizione APPLY AS DELETE WHEN.

Importante

È necessario dichiarare un target di streaming table a cui applicare le modifiche. Facoltativamente, è possibile specificare il schema per il tabledi destinazione. Quando si specifica la schema del tabledi destinazione APPLY CHANGES , è necessario includere anche il __START_AT e __END_ATcolumns con lo stesso tipo di dati del campo sequence_by.

Consulta delle API APPLY CHANGES: Semplificare la cattura delle modifiche dei dati con Delta Live Tables.

Clausole
KEYS

column o combinazione di columns che identificano in modo univoco una riga nei dati di origine. Viene usato per identificare quali eventi CDC si applicano a record specifici nel tabledi destinazione.

Per definire una combinazione di columns, usare un list delimitato da virgole di columns.

Questa clausola è obbligatoria.
IGNORE NULL UPDATES

Consentire l'inserimento di aggiornamenti contenenti un subset del columnsdi destinazione. Quando un evento CDC corrisponde a una riga esistente e viene specificato IGNORE NULL UPDATES, columns con un null manterrà il proprio values esistente nella destinazione. Questo vale anche per il columns annidato con un valore di null.

La clausola è facoltativa.

Il valore predefinito è sovrascrivere columns esistente con nullvalues.
APPLY AS DELETE WHEN

Specifica quando un evento CDC deve essere considerato come un DELETE anziché un upsert. Per gestire i dati non ordinati, la riga eliminata viene temporaneamente mantenuta come marcatore di cancellazione nel Delta sottostante tablee viene creata una vista nel metastore che filtra questi segnali di cancellazione. L'intervallo di conservazione può essere configurato con le
pipelines.cdc.tombstoneGCThresholdInSeconds table proprietà.

La clausola è facoltativa.
APPLY AS TRUNCATE WHEN

Specifica quando un evento CDC deve essere considerato come un tableTRUNCATEcompleto. Poiché questa clausola attiva un troncamento completo del tabledi destinazione , deve essere usata solo per casi d'uso specifici che richiedono questa funzionalità.

La clausola APPLY AS TRUNCATE WHEN è supportato solo per SCD di tipo 1. Il tipo SCD 2 non supporta l’operazione di troncamento.

La clausola è facoltativa.
SEQUENCE BY

Il nome column specifica l'ordine logico degli eventi CDC nei dati di origine. Delta Live Tables usa questa sequenziazione per gestire gli eventi di modifica che arrivano non in ordine.

Il column specificato deve essere un tipo di dati ordinabile.

Questa clausola è obbligatoria.
COLUMNS

Specifica un sottoinsieme di columns da includere nel tabledi destinazione. È possibile:

- Specificare il list completo del columns da includere: COLUMNS (userId, name, city).
- Impostare un list di columns per escludere: COLUMNS * EXCEPT (operation, sequenceNum)

La clausola è facoltativa.

L'impostazione predefinita è quella di includere tutte le columns nel table di destinazione quando non è specificata la clausola COLUMNS.
STORED AS

Indica se archiviare i record come SCCD di tipo 1 o SCD di tipo 2.

La clausola è facoltativa.

L'impostazione predefinita è il tipo 1.
TRACK HISTORY ON

Specifica un sottoinsieme dei record di cronologia di output da columns a generate quando ci sono modifiche a quei columnsspecificati. È possibile:

- Specificare il list completo del columns da monitorare: COLUMNS (userId, name, city).
- Specificare un list di columns da escludere dal rilevamento: COLUMNS * EXCEPT (operation, sequenceNum)

La clausola è facoltativa. L'impostazione predefinita è di tenere traccia della cronologia per tutti i dati di output columns ogni volta che ci sono modifiche, equivalenti a TRACK HISTORY ON *.