Partager via


Optimiser le traitement avec état dans Delta Live Tables avec des filigranes

Pour gérer efficacement les données conservées en l’état, utilisez des filigranes lorsque vous effectuez un traitement de flux avec état dans Delta Live Tables, notamment les agrégations, les jointures et la déduplication. Cet article explique comment utiliser des filigranes dans vos requêtes Delta Live Tables et inclut des exemples d’opérations recommandées.

Remarque

Pour vous assurer que les requêtes qui effectuent des agrégations sont traitées de manière incrémentielle et non entièrement recalculées avec chaque mise à jour, vous devez utiliser des filigranes.

Qu’est-ce qu’un filigrane ?

Dans le traitement de flux, un filigrane est une fonctionnalité Apache Spark qui peut définir un seuil basé sur le temps pour le traitement des données lors de l’exécution d’opérations avec état, telles que des agrégations. Les données arrivant sont traitées jusqu’à ce que le seuil soit atteint, moment où la fenêtre de temps définie par le seuil est fermée. Les filigranes peuvent être utilisés pour éviter les problèmes lors du traitement des requêtes, principalement lors du traitement de jeux de données plus volumineux ou d’un traitement de longue durée. Ces problèmes peuvent inclure une latence élevée dans la production de résultats et même des erreurs de mémoire insuffisante (OOM) en raison de la quantité de données conservées en l’état pendant le traitement. Étant donné que les données de diffusion en continu sont intrinsèquement non ordonnées, les filigranes prennent également en charge le calcul correct des opérations telles que les agrégations de fenêtre de temps.

Pour en savoir plus sur l’utilisation de filigranes dans le traitement de flux, consultez Filigrane dans Apache Spark Structured Streaming et Appliquer des filigranes pour contrôler les seuils de traitement des données.

Comment définir un filigrane ?

Vous définissez un filigrane en spécifiant un champ d’horodatage et une valeur représentant le seuil de temps pour l’arrivée de données tardives. Les données sont considérées tardives si elles arrivent après le seuil de temps défini. Par exemple, si le seuil est défini sur 10 minutes, les enregistrements arrivant après le seuil de 10 minutes peuvent être supprimés.

Étant donné que les enregistrements qui arrivent après le seuil défini peuvent être supprimés, la sélection d’un seuil qui répond à votre latence par rapport aux exigences de correction est importante. Le choix d’un seuil plus petit entraîne l’émission d’enregistrements plus tôt, mais signifie également que les enregistrements en retard sont plus susceptibles d’être supprimés. Un seuil plus important signifie une attente plus longue, mais des données peut-être plus complètes. En raison de la taille d’état supérieure, un seuil plus élevé peut également nécessiter des ressources informatiques supplémentaires. Étant donné que la valeur de seuil dépend de vos besoins en matière de données et de traitement, le test et la surveillance de votre traitement sont importants pour déterminer un seuil optimal.

Vous utilisez la fonction withWatermark() dans Python pour définir un filigrane. Dans SQL, utilisez la clause WATERMARK pour définir un filigrane :

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Utiliser des filigranes avec des jointures de flux-flux

Pour les jointures de flux-flux, vous devez définir un filigrane sur les deux côtés de la jointure et une clause d’intervalle de temps. Étant donné que chaque source de jointure a une vue incomplète des données, la clause d’intervalle de temps est requis pour indiquer au moteur de diffusion en continu quand aucune correspondance supplémentaire ne peut être effectuée. La clause d’intervalle de temps doit utiliser les mêmes champs que ceux utilisés pour définir les filigranes.

Étant donné qu’il peut arriver que chaque flux nécessite des seuils différents pour les filigranes, il n’est pas nécessaire que les flux aient les mêmes seuils. Pour éviter les données manquantes, le moteur de diffusion en continu conserve un filigrane global basé sur le flux le plus lent.

L’exemple suivant joint un flux d’impressions publicitaires et un flux de clics d’utilisateur sur les publicités. Dans cet exemple, un clic doit se produire dans les 3 minutes après l’impression. Une fois l’intervalle de temps de 3 minutes passé, les lignes de l’état qui ne peuvent plus être mises en correspondance sont supprimées.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Effectuer des agrégations fenêtrées avec des filigranes

Une opération avec état courante sur les données de streaming est une agrégation fenêtrée. Les agrégations fenêtrées sont similaires aux agrégations groupées, sauf que les valeurs d’agrégation sont retournées pour l’ensemble de lignes qui font partie de la fenêtre définie.

Une fenêtre peut être définie comme une certaine longueur et une opération d’agrégation peut être effectuée sur toutes les lignes qui font partie de cette fenêtre. Spark Streaming prend en charge trois types de fenêtres :

  • Fenêtres bascule (fixe) : une série d’intervalles de temps contigus fixes, qui ne se chevauchent pas. Un enregistrement d’entrée appartient à une seule fenêtre.
  • Fenêtres glissantes : similaires aux fenêtres bascules, les fenêtres glissantes sont de taille fixe, mais les fenêtres peuvent se chevaucher et un enregistrement peut tomber dans plusieurs fenêtres.

Lorsque les données arrivent au-delà de la fin de la fenêtre, plus la longueur du filigrane, aucune nouvelle donnée n’est acceptée pour la fenêtre, le résultat de l’agrégation est émis et l’état de la fenêtre est supprimé.

L’exemple suivant calcule une somme d’impressions toutes les 5 minutes en utilisant une fenêtre fixe. Dans cet exemple, la clause Select utilise l’alias impressions_window, puis la fenêtre elle-même est définie dans le cadre de la clause GROUP BY . La fenêtre doit être basée sur la même colonne d’horodatage que le filigrane, la colonne clickTimestamp dans cet exemple.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Exemple similaire dans Python pour calculer les bénéfices sur les fenêtres fixes horaires :

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Dédupliquer les enregistrements de diffusion en continu

Structured Streaming a des garanties de traitement en une seule fois, mais ne déduplique pas automatiquement des enregistrements à partir de sources de données. Par exemple, étant donné que de nombreuses files d’attente de messages ont au moins une fois des garanties, les enregistrements en double doivent être attendus lors de la lecture de l’une de ces files d’attente de messages. Vous pouvez utiliser la fonction dropDuplicatesWithinWatermark() pour dédupliquer des enregistrements sur n’importe quel champ spécifié, ce qui supprime les doublons d’un flux, même en cas de différence de champs (telle que l’heure d’événement ou l’heure d’arrivée). Vous devez spécifier un filigrane pour utiliser la fonction dropDuplicatesWithinWatermark(). Toutes les données en double qui arrivent dans l’intervalle de temps spécifié par le filigrane sont supprimées.

Les données ordonnées sont importantes, car les données non ordonnées provoquent un saut en avant incorrect de la valeur de filigrane. Ensuite, lorsque des données plus anciennes arrivent, elles sont considérées comme tardives et sont supprimées. Utilisez l’option withEventTimeOrder pour traiter l’instantané initial dans l’ordre en fonction de l’horodatage spécifié dans le filigrane. L’option withEventTimeOrder peut être déclarée dans le code définissant le jeu de données ou dans les paramètres de pipeline à l’aide de spark.databricks.delta.withEventTimeOrder.enabled. Par exemple :

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Remarque

L’option withEventTimeOrder est prise en charge uniquement avec Python.

Dans l’exemple suivant, les données sont traitées par ordre de clickTimestamp, et les enregistrements arrivant dans les 5 secondes l’un après l’autre et qui contiennent des userId en double et des colonnes clickAdId sont supprimés.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimiser la configuration du pipeline pour le traitement avec état

Pour éviter les problèmes de production et la latence excessive, Databricks recommande d’activer la gestion de l’état basée sur RocksDB pour votre traitement de flux avec état, en particulier si votre traitement nécessite d’enregistrer une grande quantité d’états intermédiaires.

Les pipelines sans serveur gèrent automatiquement les configurations du magasin d’états.

Vous pouvez activer la gestion d'état basée sur RocksDB en définissant la configuration suivante avant de déployer un pipeline :

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Pour en savoir plus sur le magasin d’état RocksDB, y compris les recommandations de configuration pour RocksDB, consultez Configurer un magasin d’état RocksDB sur Azure Databricks.