Usare parameters con le pipeline Delta Live Tables
Questo articolo illustra come usare le configurazioni della pipeline Delta Live Tables per parametrizzare il codice della pipeline.
Riferimento parameters
Durante gli aggiornamenti, il codice sorgente della pipeline può accedere alla pipeline parameters utilizzando la sintassi getvalues per le configurazioni di Spark.
Si fa riferimento alla pipeline parameters usando la chiave . Il valore viene inserito nel codice sorgente come stringa prima che la logica del codice sorgente valuti.
La sintassi di esempio seguente usa un parametro con chiave source_catalog
e valore dev_catalog
per specificare l'origine dati per una vista materializzata:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
COUNT(txn_id) txn_count,
SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return (spark.read
.table(f"{source_catalog}.sales.transactions_table")
.groupBy("account_id")
.agg(
count(col("txn_id").alias("txn_count")),
sum(col("txn_amount").alias("account_revenue"))
)
)
Set parameters
Passare parameters alle pipeline trasmettendo coppie chiave-valore arbitrarie come configurazioni per le pipeline. È possibile setparameters durante la definizione o la modifica di una configurazione della pipeline usando l'interfaccia utente dell'area di lavoro o JSON. Vedere Configurare una pipeline di Tables Delta Live.
Le chiavi dei parametri della pipeline possono contenere _ - .
solo caratteri alfanumerici o alfanumerici. I parametri values sono set come stringhe.
La pipeline parameters non supporta valuesdinamiche. È necessario update il valore associato a una chiave nella configurazione della pipeline.
Importante
Non usare parole chiave in conflitto con la pipeline riservata o la configurazione di Apache Spark values.
Parametrizzare le dichiarazioni del set di dati in Python o SQL
Il codice Python e SQL che definisce i set di dati possono essere parametrizzati dalle impostazioni della pipeline. La parametrizzazione abilita i casi d'uso seguenti:
- Separazione di percorsi lunghi e altre variabili dal codice.
- Riduzione della quantità di dati elaborati in ambienti di sviluppo o staging per velocizzare i test.
- Riutilizzo della stessa logica di trasformazione per l'elaborazione da più origini dati.
Nell'esempio seguente si utilizza il valore di configurazione startDate
per limit la pipeline di sviluppo su un sottoinsieme dei dati di input.
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
Controllare le origini dati con parameters
È possibile usare la pipeline parameters per specificare origini dati diverse in configurazioni diverse della stessa pipeline.
Ad esempio, è possibile specificare percorsi diversi nelle configurazioni di sviluppo, test e produzione per una pipeline usando la variabile data_source_path
e quindi farvi riferimento usando il codice seguente:
SQL
CREATE STREAMING TABLE bronze
AS (
SELECT
*,
_metadata.file_path AS source_file_path
FROM read_files( '${data_source_path}', 'csv',
map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.load(data_source_path )
.select("*", col("_metadata.file_path").alias("source_file_name"))
)
Questo modello è utile per testare il modo in cui la logica di inserimento può gestire schema o dati in formato non valido durante l'inserimento iniziale. È possibile usare il codice identico in tutta la pipeline in tutti gli ambienti durante la disattivazione dei set di dati.