Partager via


Qu’est-ce que le suivi de progression asynchrone ?

Important

Cette fonctionnalité est disponible en préversion publique.

Le suivi asynchrone de la progression permet aux pipelines Structured Streaming de contrôler la progression de manière asynchrone et parallèle au traitement des données réel dans un micro-lot, ce qui réduit la latence associée à la maintenance de offsetLog et commitLog.

Suivi de progression asynchrone

Notes

Le suivi de progression asynchrone ne fonctionne pas avec les déclencheurs Trigger.once ou Trigger.availableNow. La tentative d’activation de cette fonctionnalité avec ces déclencheurs entraîne l’échec de la requête.

Comment le suivi de progression asynchrone fonctionne-t-il pour réduire la latence ?

Structured Streaming s’appuie sur la persistance et la gestion des décalages en tant qu’indicateurs de progression pour le traitement des requêtes. L’opération de gestion des décalages a un impact direct sur la latence de traitement, car aucun traitement des données ne peut se produire tant que ces opérations ne sont pas terminées. Le suivi de progression asynchrone permet aux pipelines Structured Streaming de contrôler la progression sans être affectés par ces opérations de gestion des décalages.

Quand devez-vous configurer la fréquence des points de contrôle ?

Les utilisateurs peuvent configurer la fréquence à laquelle la progression est pointée. Les paramètres par défaut pour la fréquence des points de contrôle fournissent un débit correct pour la plupart des requêtes. La configuration de la fréquence est utile pour les scénarios dans lesquels les opérations de gestion des décalages se produisent à un rythme plus élevé qu’elles ne peuvent être traitées, ce qui crée un backlog toujours croissant d’opérations de gestion des décalages. Pour endiguer ce backlog croissant, le traitement des données est bloqué ou ralenti, ce qui rétablit essentiellement le comportement de traitement pour éliminer les avantages du suivi de la progression asynchrone.

Notes

Le temps de récupération d’échec augmente avec l’augmentation de la durée de l’intervalle de point de contrôle. En cas de défaillance, un pipeline doit retraiter toutes les données avant le point de contrôle précédent. Les utilisateurs peuvent envisager ce compromis entre une latence plus faible pendant le traitement normal et le temps de récupération en cas de défaillance.

Quelles sont les configurations associées au suivi de progression asynchrone ?

Option Valeur Default Description
asyncProgressTrackingEnabled true/false false activer ou désactiver le suivi de progression asynchrone
asyncProgressTrackingCheckpointIntervalMs millisecondes 1 000 intervalle dans lequel nous validons les décalages et les validations d’achèvement

Comment les utilisateurs peuvent-ils activer le suivi de progression asynchrone ?

Les utilisateurs peuvent utiliser du code similaire au code ci-dessous pour activer cette fonctionnalité :

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Désactivation du suivi de progression asynchrone

Quand le suivi de progression asynchrone est activé, le framework ne contrôle pas la progression pour chaque lot. Pour résoudre ce problème, avant de désactiver le suivi de progression asynchrone, traitez au moins deux microlots avec les paramètres suivants :

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Arrêtez la requête quand au moins deux microlots ont été entièrement traités. Vous pouvez maintenant désactiver de manière sûre le suivi de progression asynchrone et redémarrer la requête.

Si vous avez désactivé le suivi de progression asynchrone sans effectuer cette étape, vous pouvez rencontrer l’erreur suivante :

java.lang.IllegalStateException: batch x doesn't exist

Dans les journaux de pilote, vous pouvez voir l’erreur suivante :

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

En suivant les instructions de cette section pour désactiver le suivi asynchrone de la progression, vous pouvez résoudre ces erreurs et corriger votre charge de travail de streaming.

Limitations avec le suivi de progression asynchrone

Cette fonctionnalité présente les limitations suivantes :

  • Le suivi de progression asynchrone n’est pris en charge que dans les pipelines sans état lors de l’utilisation de Kafka comme récepteur.
  • Le traitement de bout en bout exactement une fois n’est pas garanti avec le suivi de progression asynchrone, car les plages de décalage pour le lot peuvent être modifiées en cas de défaillance. Certains récepteurs, tels que Kafka, n’offrent jamais de garanties exactement une seule fois.