Caricare ed elaborare i dati in modo incrementale con flussi di Tables Delta Live
Questo articolo spiega cosa sono i flussi di lavoro e come è possibile utilizzare i flussi di lavoro nelle pipeline di Delta Live Tables per elaborare in modo incrementale i dati da un'origine a una destinazione di streaming table. In Delta Live Tablesi flussi vengono definiti in due modi:
- Un flusso viene definito automaticamente quando si crea una query che aggiorna uno streaming table.
- Delta Live Tables offre anche funzionalità per definire in modo esplicito i flussi per un'elaborazione più complessa, ad esempio l'aggiunta a un table di streaming da più origini di streaming.
Questo articolo discute i flussi impliciti che vengono creati quando si definisce una query per update un tabledi streaming e fornisce poi dettagli sulla sintassi per definire flussi più complessi.
Che cos'è un flusso?
In Delta Live Tablesun flusso di è una query di streaming che elabora i dati di origine in modo incrementale per update un flusso di destinazione table. La maggior parte dei set di dati Delta Live Tables creati in una pipeline definisce il flusso come parte della query e non richiede la definizione esplicita del flusso. Ad esempio, si crea un table di streaming in Delta Live Tables in un singolo comando DDL anziché usare istruzioni di flusso e table separate per creare il tabledi streaming :
Nota
Questo esempio CREATE FLOW
viene fornito solo a scopo illustrativo e include parole chiave non valide per la sintassi delta Live Tables.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Oltre al flusso predefinito definito da una query, le interfacce Delta Live Tables Python e SQL forniscono la funzionalità di flusso di accodamento . Il flusso di accodamento supporta l'elaborazione che richiede la lettura di dati da più origini di streaming per update un singolo flusso table. Ad esempio, è possibile usare la funzionalità del flusso di accodamento quando si dispone di un flusso di streaming esistente table e si vuole aggiungere una nuova origine di streaming che scrive in questo flusso esistente table.
Usare il flusso di accodamento per scrivere in un table di streaming da più flussi di origine
Usare il decoratore @append_flow
nell'interfaccia Python o la clausola CREATE FLOW
nell'interfaccia SQL per scrivere in un table di streaming da più sorgenti in streaming. Usare il flusso di accodamento per l'elaborazione di attività come le seguenti:
- Aggiungere origini di streaming che aggiungono dati a un table di streaming esistente senza richiedere un refreshcompleto. Ad esempio, si potrebbe avere un table che combina i dati regionali di ogni regione in cui si opera. Man mano che vengono implementate nuove aree, è possibile aggiungere i nuovi dati di area al table senza eseguire un refreshcompleto. Vedere esempio: Scrivere in un table di streaming da più argomenti Kafka.
- Update un table di streaming aggiungendo dati cronologici mancanti (backfilling). Ad esempio, si dispone di un table di streaming esistente scritto da un topic Apache Kafka. Hai anche dati storici archiviati in un table che devi inserire esattamente una volta nello streaming table, e non puoi trasmettere i dati perché la tua elaborazione prevede di eseguire un'aggregazione complessa prima di inserire i dati. Vedere Esempio: Eseguire un backfill di dati monouso.
- Combinare dati da più origini e scrivere su un singolo flusso table invece di usare la clausola
UNION
in una query. L'elaborazione del flusso di accodamento, invece di utilizzareUNION
, consente di update il target table in modo incrementale senza eseguire un completo refreshupdate. Vedere Esempio: Usare l'elaborazione del flusso di accodamento anziché UNION.
La destinazione dei record generati dall'elaborazione del flusso di accodamento può essere un table esistente oppure un nuovo table. Per le query Python, utilizzare la funzione create_streaming_table() per creare un target table.
Importante
- Se hai bisogno di definire vincoli di qualità dei dati con aspettative , definisci le aspettative sul table di destinazione come parte della funzione
create_streaming_table()
o su una definizione esistente di table. Non è possibile definire le aspettative nella@append_flow
definizione. - I flussi vengono identificati da un nome di flusso e questo nome viene usato per identificare i checkpoint di streaming. L'uso del nome del flusso per identificare il checkpoint indica quanto segue:
- Se un flusso esistente in una pipeline viene rinominato, il checkpoint non viene portato avanti e il flusso rinominato è effettivamente un flusso completamente nuovo.
- Non è possibile riutilizzare un nome di flusso in una pipeline, perché il checkpoint esistente non corrisponde alla nuova definizione del flusso.
Di seguito è riportata la sintassi per @append_flow
:
Python
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
SQL
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
target_table BY NAME
SELECT * FROM
source;
esempio : Scrivere in un table di streaming da più argomenti Kafka
Gli esempi seguenti creano un table di streaming denominato kafka_target
e scrivono in tale flusso table da due argomenti Kafka:
Python
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Per altre informazioni sulla funzione read_kafka()
table-valued usata nelle query SQL, vedere read_kafka nelle informazioni di riferimento sul linguaggio SQL.
Esempio: Eseguire un riempimento dati monouso
Gli esempi seguenti eseguono una query per aggiungere dati cronologici a un tabledi streaming:
Nota
Per garantire un vero backfill monouso quando la query di backfill fa parte di una pipeline eseguita su base pianificata o in modo continuo, remove la query dopo l'esecuzione della pipeline una sola volta. Per aggiungere nuovi dati se arrivano nella directory backfill, lasciare la query sul posto.
Python
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
SQL
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Esempio: usare l'elaborazione del flusso di accodamento anziché UNION
Anziché usare una query con una clausola UNION
, è possibile usare query di flusso di aggiunta per combinare più origini e scrivere su un singolo flusso table. L'utilizzo di query di flusso di accodamento anziché UNION
consente di aggiungere dati a un flusso table proveniente da più origini senza dover eseguire un completo di refresh.
L'esempio python seguente include una query che combina più origini dati con una UNION
clausola :
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Gli esempi seguenti sostituiscono la UNION
query con le query del flusso di accodamento:
Python
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
SQL
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/us",
"csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/eu",
"csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
read_files(
"/path/to/orders/apac",
"csv"
);