Condividi tramite


Usare parameters con le pipeline Delta Live Tables

Questo articolo illustra come usare le configurazioni della pipeline Delta Live Tables per parametrizzare il codice della pipeline.

Riferimento parameters

Durante gli aggiornamenti, il codice sorgente della pipeline può accedere alla pipeline parameters utilizzando la sintassi getvalues per le configurazioni di Spark.

Si fa riferimento alla pipeline parameters usando la chiave . Il valore viene inserito nel codice sorgente come stringa prima che la logica del codice sorgente valuti.

La sintassi di esempio seguente usa un parametro con chiave source_catalog e valore dev_catalog per specificare l'origine dati per una vista materializzata:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

Set parameters

Passare parameters alle pipeline trasmettendo coppie chiave-valore arbitrarie come configurazioni per le pipeline. È possibile setparameters durante la definizione o la modifica di una configurazione della pipeline usando l'interfaccia utente dell'area di lavoro o JSON. Vedere Configurare una pipeline di Tables Delta Live.

Le chiavi dei parametri della pipeline possono contenere _ - . solo caratteri alfanumerici o alfanumerici. I parametri values sono set come stringhe.

La pipeline parameters non supporta valuesdinamiche. È necessario update il valore associato a una chiave nella configurazione della pipeline.

Importante

Non usare parole chiave in conflitto con la pipeline riservata o la configurazione di Apache Spark values.

Parametrizzare le dichiarazioni del set di dati in Python o SQL

Il codice Python e SQL che definisce i set di dati possono essere parametrizzati dalle impostazioni della pipeline. La parametrizzazione abilita i casi d'uso seguenti:

  • Separazione di percorsi lunghi e altre variabili dal codice.
  • Riduzione della quantità di dati elaborati in ambienti di sviluppo o staging per velocizzare i test.
  • Riutilizzo della stessa logica di trasformazione per l'elaborazione da più origini dati.

Nell'esempio seguente si utilizza il valore di configurazione startDate per limit la pipeline di sviluppo su un sottoinsieme dei dati di input.

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

Controllare le origini dati con parameters

È possibile usare la pipeline parameters per specificare origini dati diverse in configurazioni diverse della stessa pipeline.

Ad esempio, è possibile specificare percorsi diversi nelle configurazioni di sviluppo, test e produzione per una pipeline usando la variabile data_source_path e quindi farvi riferimento usando il codice seguente:

SQL

CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

Questo modello è utile per testare il modo in cui la logica di inserimento può gestire schema o dati in formato non valido durante l'inserimento iniziale. È possibile usare il codice identico in tutta la pipeline in tutti gli ambienti durante la disattivazione dei set di dati.