Condividi tramite


Sviluppare codice della pipeline con SQL

Le tabelle live delta introduce diverse nuove parole chiave e funzioni SQL per la definizione di viste materializzate e tabelle di streaming nelle pipeline. Il supporto SQL per lo sviluppo di pipeline si basa sulle nozioni di base di Spark SQL e aggiunge il supporto per la funzionalità Structured Streaming.

Gli utenti che hanno familiarità con i dataframe PySpark potrebbero preferire lo sviluppo di codice della pipeline con Python. Python supporta test e operazioni più estese che sono difficili da implementare con SQL, ad esempio le operazioni di metaprogrammazione. Vedere Sviluppare codice della pipeline con Python.

Per informazioni di riferimento complete sulla sintassi SQL delle tabelle live Delta, vedere Informazioni di riferimento sul linguaggio SQL di Tabelle live Delta.

Nozioni di base di SQL per lo sviluppo di pipeline

Il codice SQL che crea set di dati delta live tables usa la CREATE OR REFRESH sintassi per definire viste materializzate e tabelle di streaming sui risultati delle query.

La STREAM parola chiave indica se l'origine dati a cui si fa riferimento in una SELECT clausola deve essere letta con la semantica di streaming.

Il codice sorgente delle tabelle live delta differisce in modo critico dagli script SQL: le tabelle Live Delta valutano tutte le definizioni di set di dati in tutti i file di codice sorgente configurati in una pipeline e compila un grafico del flusso di dati prima dell'esecuzione di qualsiasi query. L'ordine delle query visualizzate in un notebook o in uno script non definisce l'ordine di esecuzione.

Creare una vista materializzata con SQL

L'esempio di codice seguente illustra la sintassi di base per la creazione di una vista materializzata con SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Creare una tabella di streaming con SQL

L'esempio di codice seguente illustra la sintassi di base per la creazione di una tabella di streaming con SQL:

Nota

Non tutte le origini dati supportano le letture di streaming e alcune origini dati devono essere sempre elaborate con la semantica di streaming.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Caricare dati dall'archivio oggetti

Delta Live Tables supporta il caricamento di dati da tutti i formati supportati da Azure Databricks. Vedere Opzioni di formato dati.

Nota

Questi esempi usano i dati disponibili nell'area di lavoro montata automaticamente nell'area /databricks-datasets di lavoro. Databricks consiglia di usare i percorsi del volume o gli URI cloud per fare riferimento ai dati archiviati nell'archiviazione di oggetti cloud. Vedere Che cosa sono i volumi del catalogo Unity?.

Databricks consiglia di usare il caricatore automatico e le tabelle di streaming quando si configurano carichi di lavoro di inserimento incrementali sui dati archiviati nell'archiviazione di oggetti cloud. Vedere Che cos'è l’Autoloader?.

SQL usa la funzione per richiamare la read_files funzionalità del caricatore automatico. È anche necessario usare la STREAM parola chiave per configurare una lettura di streaming con read_files.

L'esempio seguente crea una tabella di streaming da file JSON usando il caricatore automatico:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

La funzione supporta anche la read_files semantica batch per creare viste materializzate. L'esempio seguente usa la semantica batch per leggere una directory JSON e creare una vista materializzata:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Convalidare i dati con le aspettative

È possibile usare le aspettative per impostare e applicare vincoli di qualità dei dati. Vedere Gestire la qualità dei dati con Delta Live Tables.

Il codice seguente definisce un'aspettativa denominata valid_data che elimina i record null durante l'inserimento dati:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Eseguire query su viste materializzate e tabelle di streaming definite nella pipeline

Usare lo LIVE schema per eseguire query su altre viste materializzate e tabelle di streaming definite nella pipeline.

L'esempio seguente definisce quattro set di dati:

  • Tabella di streaming denominata orders che carica i dati JSON.
  • Vista materializzata denominata customers che carica i dati CSV.
  • Una vista materializzata denominata customer_orders che unisce i record dai orders set di dati e customers , esegue il cast del timestamp dell'ordine a una data e seleziona i customer_idcampi , order_number, statee order_date .
  • Vista materializzata denominata daily_orders_by_state che aggrega il conteggio giornaliero degli ordini per ogni stato.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;