Partager via


Points de contrôle Structured Streaming

Les points de contrôle et les journaux en écriture anticipée fonctionnent ensemble pour fournir des garanties de traitement pour les charges de travail Structured Streaming. Le point de contrôle suit les informations qui identifient la requête, notamment les informations d’état et les enregistrements traités. Lorsque vous supprimez les fichiers d’un répertoire de point de contrôle ou que vous passez à un nouvel emplacement de point de contrôle, l’exécution suivante de la requête recommence à zéro.

Chaque requête doit avoir un emplacement de point de contrôle distinct. Plusieurs requêtes ne doivent jamais partager le même emplacement.

Activer la création de point de contrôle pour des requêtes de Structured Streaming

Vous devez spécifier l’option checkpointLocation avant d’exécuter une requête de diffusion en continu, comme dans l’exemple suivant :

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Remarque

Certains récepteurs, tels que la sortie pour display() dans les notebooks et le récepteur memory, génèrent automatiquement un emplacement de point de contrôle temporaire si vous omettez cette option. Ces emplacements de point de contrôle temporaires ne garantissent pas la tolérance de panne ou la cohérence des données, et peuvent ne pas être nettoyés correctement. Databricks conseille de toujours spécifier un emplacement de point de contrôle pour ces récepteurs.

Récupérer après des modifications dans une requête de Structured Streaming

Il existe des limitations aux modifications de requête de diffusion en continu autorisées entre les redémarrages à partir d’un même emplacement de point de contrôle. Voici quelques modifications qui ne sont pas autorisées, ou dont l’effet n’est pas bien défini. Pour la totalité d’entre elles :

  •  Autorisée  signifie que vous pouvez apporter la modification spécifiée, mais que le caractère bien défini de la sémantique de son effet dépend de la requête et de la modification.
  •  Non autorisée  signifie que vous ne deviez pas apporter la modification spécifiée, car la requête redémarrée risque d’échouer avec des erreurs imprévisibles.
  • sdf représente une tramedonnées ou un jeu de données de diffusion en continu générés avec sparkSession.readStream.

Types de modifications dans des requêtes de Structured Streaming

  • Modifications du nombre ou du type (à savoir, source différente) des sources d’entrée : cela n’est pas autorisé.
  • Modifications des paramètres de sources d’entrée : la source et la requête déterminent si la modification est autorisée et si sa sémantique est bien définie. Voici quelques exemples :
    • L’ajout, la suppression et la modification des limites de taux sont autorisés :

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Les modifications d’articles et de fichiers souscrits ne sont généralement pas autorisées, car leurs résultats sont imprévisibles : de spark.readStream.format("kafka").option("subscribe", "article") en spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Modifications dans l’intervalle de déclencheur : vous pouvez modifier les déclencheurs entre les lots incrémentiels et les intervalles de temps. Consultez Modification des intervalles de déclencheur entre des exécutions.
  • Modifications du type de récepteur de sortie : les modifications de quelques combinaisons spécifiques de récepteurs sont autorisées. Cela doit être vérifié au cas par cas. Voici quelques exemples :
    • La modification de récepteur de fichiers en récepteur Kafka est autorisée. Kafka ne verra que les nouvelles données.
    • La modification de récepteur Kafka en récepteur de fichiers n’est pas autorisée.
    • La modification du récepteur Kafka en foreach, ou l’inverse, est autorisée.
  • Modifications des paramètres de récepteur de sortie : le récepteur et la requête déterminent si la modification est autorisée et si sa sémantique est bien définie. Voici quelques exemples :
    • Les modifications apportées au répertoire de sortie d’un récepteur de fichiers ne sont pas autorisées : de sdf.writeStream.format("parquet").option("path", "/somePath") en sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Les modifications apportées à la rubrique de sortie sont autorisées : sdf.writeStream.format("kafka").option("topic", "topic1") à sdf.writeStream.format("kafka").option("topic", "topic2")
    • Les modifications apportées au récepteur foreach défini par l’utilisateur (autrement dit, le code ForeachWriter) sont autorisées, mais leur sémantique dépend du code.
  • Modifications des opérations de type projection, filtrage et mappage  : certains cas sont autorisés. Par exemple :
    • L’ajout et la suppression de filtres sont autorisé : de sdf.selectExpr("a") en sdf.where(...).selectExpr("a").filter(...).
    • Les modifications apportées aux projections avec le même schéma de sortie sont autorisées : de sdf.selectExpr("stringColumn AS json").writeStream en sdf.select(to_json(...).as("json")).writeStream.
    • Les modifications apportées aux projections avec un schéma de sortie différent sont autorisées sous condition : une modification de sdf.selectExpr("a").writeStream en sdf.selectExpr("b").writeStream n’est autorisée que si le récepteur de sortie autorise la modification de schéma de "a" en "b".
  • Modifications des opérations avec état : certaines opérations dans des requêtes de diffusion en continu doivent gérer les données d’état de façon à mettre à jour le résultat en permanence. La diffusion en continu structurée crée automatiquement des points de contrôle pour les données d’état dans un stockage avec tolérance de panne (par exemple, DBFS, Stockage Blob Azure), et les restaure après redémarrage. Toutefois, cela suppose que le schéma des données d’état ne change pas au fil des redémarrages. Cela signifie que l’apport de modifications (ajouts, suppressions ou modifications de schéma) aux opérations avec état d’une requête de diffusion en continu n’est pas autorisés entre les redémarrages. Voici la liste des opérations avec état dont le schéma ne doit pas être modifié entre les redémarrages afin de garantir la récupération de l’état :
    • Agrégation de diffusion en continu : par exemple, sdf.groupBy("a").agg(...). Les modifications du nombre ou du type de clés de regroupement ou d’agrégats ne sont pas autorisées.
    • Déduplication de la diffusion en continu : par exemple, sdf.dropDuplicates("a"). Les modifications du nombre ou du type de clés de regroupement ou d’agrégats ne sont pas autorisées.
    • Jointure flux-flux : par exemple, sdf1.join(sdf2, ...) (les deux entrées sont générées avec sparkSession.readStream). Les modifications des colonnes schema ou equi-joining ne sont pas autorisées. Les modifications de type de jointure (externe ou interne) ne sont pas autorisées. Les autres modifications de la condition de jointure sont mal définies.
    • Opération avec état arbitraire : par exemple, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Les modifications du schéma de l’état défini par l’utilisateur et du type de délai d’expiration ne sont pas autorisées. Toute modification au sein de la fonction de mappage d’état défini par l’utilisateur est autorisée, mais l’effet sémantique de la modification dépend de la logique définie par l’utilisateur. Si vous souhaitez vraiment prendre en charge les modifications de schéma d’état, vous pouvez encoder/décoder explicitement vos structures de données d’état complexes en octets à l’aide d’un schéma d’encodage/décodage prenant en charge la migration de schéma. Par exemple, si vous enregistrez votre état sous la forme d’octets encodés Avro, vous pouvez modifier le schéma d’état Avro entre les redémarrages de requête, car cela a pour effet de restaurer l’état binaire.