Condividi tramite


Caricare ed elaborare i dati in modo incrementale con flussi di Tables Delta Live

Questo articolo spiega cosa sono i flussi di lavoro e come è possibile utilizzare i flussi di lavoro nelle pipeline di Delta Live Tables per elaborare in modo incrementale i dati da un'origine a una destinazione di streaming table. In Delta Live Tablesi flussi vengono definiti in due modi:

  1. Un flusso viene definito automaticamente quando si crea una query che aggiorna uno streaming table.
  2. Delta Live Tables offre anche funzionalità per definire in modo esplicito i flussi per un'elaborazione più complessa, ad esempio l'aggiunta a un table di streaming da più origini di streaming.

Questo articolo discute i flussi impliciti che vengono creati quando si definisce una query per update un tabledi streaming e fornisce poi dettagli sulla sintassi per definire flussi più complessi.

Che cos'è un flusso?

In Delta Live Tablesun flusso di è una query di streaming che elabora i dati di origine in modo incrementale per update un flusso di destinazione table. La maggior parte dei set di dati Delta Live Tables creati in una pipeline definisce il flusso come parte della query e non richiede la definizione esplicita del flusso. Ad esempio, si crea un table di streaming in Delta Live Tables in un singolo comando DDL anziché usare istruzioni di flusso e table separate per creare il tabledi streaming :

Nota

Questo esempio CREATE FLOW viene fornito solo a scopo illustrativo e include parole chiave non valide per la sintassi delta Live Tables.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Oltre al flusso predefinito definito da una query, le interfacce Delta Live Tables Python e SQL forniscono la funzionalità di flusso di accodamento . Il flusso di accodamento supporta l'elaborazione che richiede la lettura di dati da più origini di streaming per update un singolo flusso table. Ad esempio, è possibile usare la funzionalità del flusso di accodamento quando si dispone di un flusso di streaming esistente table e si vuole aggiungere una nuova origine di streaming che scrive in questo flusso esistente table.

Usare il flusso di accodamento per scrivere in un table di streaming da più flussi di origine

Usare il decoratore @append_flow nell'interfaccia Python o la clausola CREATE FLOW nell'interfaccia SQL per scrivere in un table di streaming da più sorgenti in streaming. Usare il flusso di accodamento per l'elaborazione di attività come le seguenti:

  • Aggiungere origini di streaming che aggiungono dati a un table di streaming esistente senza richiedere un refreshcompleto. Ad esempio, si potrebbe avere un table che combina i dati regionali di ogni regione in cui si opera. Man mano che vengono implementate nuove aree, è possibile aggiungere i nuovi dati di area al table senza eseguire un refreshcompleto. Vedere esempio: Scrivere in un table di streaming da più argomenti Kafka.
  • Update un table di streaming aggiungendo dati cronologici mancanti (backfilling). Ad esempio, si dispone di un table di streaming esistente scritto da un topic Apache Kafka. Hai anche dati storici archiviati in un table che devi inserire esattamente una volta nello streaming table, e non puoi trasmettere i dati perché la tua elaborazione prevede di eseguire un'aggregazione complessa prima di inserire i dati. Vedere Esempio: Eseguire un backfill di dati monouso.
  • Combinare dati da più origini e scrivere su un singolo flusso table invece di usare la clausola UNION in una query. L'elaborazione del flusso di accodamento, invece di utilizzare UNION, consente di update il target table in modo incrementale senza eseguire un completo refreshupdate. Vedere Esempio: Usare l'elaborazione del flusso di accodamento anziché UNION.

La destinazione dei record generati dall'elaborazione del flusso di accodamento può essere un table esistente oppure un nuovo table. Per le query Python, utilizzare la funzione create_streaming_table() per creare un target table.

Importante

  • Se hai bisogno di definire vincoli di qualità dei dati con aspettative , definisci le aspettative sul table di destinazione come parte della funzione create_streaming_table() o su una definizione esistente di table. Non è possibile definire le aspettative nella @append_flow definizione.
  • I flussi vengono identificati da un nome di flusso e questo nome viene usato per identificare i checkpoint di streaming. L'uso del nome del flusso per identificare il checkpoint indica quanto segue:
    • Se un flusso esistente in una pipeline viene rinominato, il checkpoint non viene portato avanti e il flusso rinominato è effettivamente un flusso completamente nuovo.
    • Non è possibile riutilizzare un nome di flusso in una pipeline, perché il checkpoint esistente non corrisponde alla nuova definizione del flusso.

Di seguito è riportata la sintassi per @append_flow:

Python

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)

SQL

CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

esempio : Scrivere in un table di streaming da più argomenti Kafka

Gli esempi seguenti creano un table di streaming denominato kafka_target e scrivono in tale flusso table da due argomenti Kafka:

Python

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Per altre informazioni sulla funzione read_kafka()table-valued usata nelle query SQL, vedere read_kafka nelle informazioni di riferimento sul linguaggio SQL.

Esempio: Eseguire un riempimento dati monouso

Gli esempi seguenti eseguono una query per aggiungere dati cronologici a un tabledi streaming:

Nota

Per garantire un vero backfill monouso quando la query di backfill fa parte di una pipeline eseguita su base pianificata o in modo continuo, remove la query dopo l'esecuzione della pipeline una sola volta. Per aggiungere nuovi dati se arrivano nella directory backfill, lasciare la query sul posto.

Python

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Esempio: usare l'elaborazione del flusso di accodamento anziché UNION

Anziché usare una query con una clausola UNION, è possibile usare query di flusso di aggiunta per combinare più origini e scrivere su un singolo flusso table. L'utilizzo di query di flusso di accodamento anziché UNION consente di aggiungere dati a un flusso table proveniente da più origini senza dover eseguire un completo di refresh.

L'esempio python seguente include una query che combina più origini dati con una UNION clausola :

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Gli esempi seguenti sostituiscono la UNION query con le query del flusso di accodamento:

Python

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  read_files(
    "/path/to/orders/apac",
    "csv"
  );