Condividi tramite


Optimize l'elaborazione con stato in Delta Live Tables con filigrane

Per gestire in modo efficace i dati mantenuti nello stato, usare le filigrane durante l'elaborazione di flussi con stato in Delta Live Tables, incluse aggregazioni, join e deduplicazione. Questo articolo descrive come utilizzare i watermark nelle query Delta Live Tables e include esempi delle operazioni consigliate.

Nota

Per garantire che le query che eseguono aggregazioni vengano elaborate in modo incrementale e non completamente ricalcolate con ogni update, è necessario usare filigrane.

Che cos'è un watermark?

Nell'elaborazione del flusso, un watermark è una funzionalità di Apache Spark che può definire una soglia basata sul tempo per l'elaborazione dei dati durante l'esecuzione di operazioni con stato, ad esempio le aggregazioni. I dati in arrivo vengono elaborati fino a quando non viene raggiunta la soglia, al momento in cui il tempo window definito dalla soglia viene chiuso. Le filigrane possono essere usate per evitare problemi durante l'elaborazione delle query, principalmente durante l'elaborazione di set di dati di dimensioni maggiori o l'elaborazione a esecuzione prolungata. Questi problemi possono includere una latenza elevata nella produzione di risultati e anche errori di memoria insufficiente a causa della quantità di dati mantenuti nello stato durante l'elaborazione. Poiché i dati di streaming sono intrinsecamente non ordinati, le filigrane supportano anche il calcolo corretto di operazioni come le aggregazioni temporaliwindow.

Per altre informazioni sull'uso delle filigrane nell'elaborazione del flusso, vedere Filigrana in Apache Spark Structured Streaming e Applicare filigrane per controllare le soglie di elaborazione dei dati.

Come si definisce un watermark?

Per definire un watermark, specificare un campo timestamp e un valore che rappresenta la soglia di tempo entro cui i dati in ritardo devono arrivare. I dati sono considerati in ritardo se arrivano dopo la soglia temporale definita. Ad esempio, se la soglia è definita come 10 minuti, i record che arrivano dopo la soglia di 10 minuti potrebbero essere eliminati.

Poiché i record che arrivano dopo la soglia definita potrebbero essere eliminati, è importante selezionare una soglia che soddisfi i requisiti di latenza e correttezza. La scelta di una soglia più piccola comporta l'emissione dei record prima, ma significa anche che è più probabile che i record in ritardo vengano eliminati. Una soglia maggiore indica un'attesa più lunga, ma probabilmente più completa dei dati. A causa delle dimensioni dello stato maggiori, una soglia maggiore potrebbe richiedere anche risorse di calcolo aggiuntive. Poiché il valore soglia dipende dai requisiti di dati e elaborazione, il test e il monitoraggio dell'elaborazione sono importanti per determinare una soglia ottimale.

Usi la funzione withWatermark() in Python per definire un watermark. In SQL usare la clausola WATERMARK per definire un watermark:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Usare filigrane con join tra flussi

Per i join di flusso, è necessario definire un watermark su entrambi i lati della join e una clausola di intervallo di tempo. Poiché ogni origine join ha una visione incompleta dei dati, è necessaria la clausola relativa all'intervallo di tempo per indicare al motore di streaming quando non è possibile effettuare ulteriori corrispondenze. La clausola intervallo di tempo deve utilizzare gli stessi campi usati per definire le filigrane.

Poiché potrebbero verificarsi momenti in cui ogni flusso richiede soglie diverse per le filigrane, i flussi non devono avere le stesse soglie. Per evitare dati mancanti, il motore di streaming mantiene un watermark globale basato sul flusso più lento.

L'esempio seguente unisce un flusso di impression pubblicitarie e un flusso di clic degli utenti sugli annunci. In questo esempio, un clic deve verificarsi entro 3 minuti dall'impressione. Dopo il passaggio dell'intervallo di tempo di 3 minuti, le righe dello stato che non possono più essere confrontate vengono eliminate.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Eseguire aggregazioni finestrate con filigrane

Un'operazione con stato comune sui dati di streaming è un'aggregazione con finestra. Le aggregazioni con finestra sono simili alle aggregazioni raggruppate, ad eccezione che le aggregazioni values vengono restituite per il set di righe che fanno parte del windowche è stato definito.

Un window può essere definito come una determinata lunghezza e un'operazione di aggregazione può essere eseguita su tutte le righe che fanno parte di tale window. Spark Streaming supporta tre tipi di finestre:

  • Finestre a cascata (fisse): serie di intervalli di tempo fissi, non sovrapposti e contigui. Un record di input appartiene solo a un singolo window.
  • Finestre scorrevoli: analogamente alle finestre a cascata, le finestre scorrevoli sono a dimensione fissa, ma le finestre possono sovrapporsi e un record può rientrare in più finestre.

Quando i dati arrivano oltre la fine del window più la lunghezza del watermark, non vengono accettati nuovi dati per il window, il risultato dell'aggregazione viene emesso e lo stato per il window viene eliminato.

Nell'esempio seguente viene calcolata la somma delle impressioni ogni 5 minuti utilizzando un windowfisso. In questo esempio, la clausola select usa l'alias impressions_windowe quindi il window stesso viene definito come parte della clausola GROUP BY. Il window deve essere basato sullo stesso timestamp column del watermark, del clickTimestampe delcolumn in questo esempio.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Un esempio simile in Python per calcolare il profitto su finestre fisse orarie:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplicare i record di streaming

Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. Ad esempio, poiché molte code di messaggi hanno almeno una volta garanzie, i record duplicati devono essere previsti durante la lettura da una di queste code di messaggi. È possibile usare la dropDuplicatesWithinWatermark() funzione per deduplicare i record in qualsiasi campo specificato, rimuovendo i duplicati da un flusso anche se alcuni campi differiscono , ad esempio l'ora dell'evento o l'ora di arrivo. È necessario specificare un watermark per usare la funzione dropDuplicatesWithinWatermark(). Tutti i dati duplicati che arrivano entro l'intervallo di tempo specificato dal watermark vengono eliminati.

I dati ordinati sono importanti perché i dati non ordinati fanno sì che il valore watermark proceda in modo errato. Quindi, quando arrivano dati meno recenti, vengono considerati in ritardo e eliminati. Usare l'opzione withEventTimeOrder per elaborare lo snapshot iniziale in base al timestamp specificato nella watermark. L'opzione withEventTimeOrder può essere dichiarata nel codice che definisce il set di dati o nelle impostazioni della pipeline usando spark.databricks.delta.withEventTimeOrder.enabled. Ad esempio:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Nota

L'opzione withEventTimeOrder è supportata solo con Python.

Nell'esempio seguente, i dati vengono elaborati in ordine di clickTimestamp, e i record che arrivano entro un intervallo di 5 secondi che contengono duplicati di userId e di clickAdIdcolumns vengono eliminati.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimize configurazione della pipeline per l'elaborazione con stato

Per evitare problemi di produzione e una latenza eccessiva, Databricks consiglia di abilitare la gestione dello stato basata su RocksDB per l'elaborazione del flusso con stato, in particolare se l'elaborazione richiede un notevole risparmio di una grande quantità di stato intermedio.

Le pipeline senza sever gestiscono automaticamente le configurazioni dell'archivio stati.

È possibile abilitare la gestione dello stato basata su RocksDB impostando la configurazione seguente prima di distribuire una pipeline:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Per altre informazioni sull'archivio stati di RocksDB, incluse le raccomandazioni di configurazione per RocksDB, vedere Configurare l'archivio stati di RocksDB in Azure Databricks.