Diffusion en continu et ingestion incrémentielle
Azure Databricks utilise le flux structuré Apache Spark pour soutenir de nombreux produits associés aux charges de travail d’ingestion, notamment :
- Chargeur automatique
COPY INTO
- Pipelines Delta Live Tables
- Vues matérialisées et tables de diffusion en continu dans Databricks SQL
Cet article décrit certaines des différences entre la sémantique de traitement par lots de diffusion en continu et d’ingestion incrémentielle et fournit une vue d’ensemble générale de la configuration des charges de travail d’ingestion pour la sémantique souhaitée dans Databricks.
Quelle est la différence entre l’ingestion par lots incrémentielle et par diffusion en continu ?
Les configurations de flux de travail d’ingestion possibles vont du traitement en quasi-temps réel au traitement par lots incrémentiel peu fréquent. Les deux modèles utilisent le flux structuré Apache Spark pour alimenter le traitement incrémentiel, mais ont une sémantique différente. Par souci de simplicité, cet article fait référence à l’ingestion en quasi-temps réel en tant qu’ingestion de diffusion en continu et le traitement incrémentiel moins fréquent en tant qu’ingestion incrémentielle par lots.
Ingestion de streaming
La diffusion en continu, dans le contexte de l’ingestion des données et des mises à jour de tables, fait référence au traitement des données en quasi-temps réel, où Azure Databricks ingère des enregistrements de la source à un récepteur dans des micro-lots à l’aide de l’infrastructure always-on. Une charge de travail de diffusion en continu ingère en continu les mises à jour à partir de sources de données configurées, sauf si une défaillance qui arrête l’ingestion se produit.
Ingestion par lots incrémentielle
L’ingestion par lots incrémentielle fait référence à un modèle dans lequel tous les nouveaux enregistrements sont traités à partir d’une source de données par un travail de courte durée. L’ingestion par lots incrémentielle se produit souvent selon une planification, mais elle peut également être déclenchée manuellement ou en fonction de l’arrivée des fichiers.
L’ingestion par lots incrémentielle diffère de l’ingestion par lots en cela qu’elle détecte automatiquement les nouveaux enregistrements dans la source de données et ignore les enregistrements qui ont déjà été ingérés.
Ingestion avec des travaux
Les travaux Databricks vous permettent d’orchestrer des flux de travail et de planifier des tâches qui incluent des notebooks, des bibliothèques, des pipelines Delta Live Tables et des requêtes SQL Databricks.
Remarque
Vous pouvez utiliser tous les types de calcul et les types de tâche Azure Databricks pour configurer l’ingestion par lots incrémentielle. L’ingestion de diffusion en continu est prise en charge uniquement en production sur les travaux classiques de calcul et Delta Live Tables.
Les travaux ont deux modes de fonctionnement principaux :
- Les travaux continus sont réessayés automatiquement s’ils rencontrent un échec. Ce mode est destiné à l’ingestion de diffusion en continu.
- Les travaux déclenchés exécutent des tâches lorsqu’ils sont déclenchés. Les déclencheurs comprennent des :
- Déclencheurs basés sur le temps qui exécutent des travaux en suivant une planification spécifiée.
- Déclencheurs basés sur des fichiers qui exécutent des travaux lorsque des fichiers apparaissent à un emplacement spécifié.
- Autres déclencheurs, tels que les appels d’API REST, l’exécution de commandes CLI Azure Databricks, ou le fait de cliquer sur le bouton Exécuter maintenant dans l’interface utilisateur de l’espace de travail.
Pour les charges de travail par d’ingestion par lots incrémentielle, configurez vos travaux à l’aide du mode déclencheur AvailableNow
, comme suit :
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
Pour les charges de travail de diffusion en continu, l’intervalle de déclencheur par défaut est processingTime ="500ms"
. L’exemple suivant montre comment traiter un micro-lot toutes les 5 secondes :
Python
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
Scala
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
Important
Les travaux serverless ne prennent pas en charge Scala, le mode continu et les déclencheurs basés sur le temps pour le flux structuré. Utilisez des travaux classiques si vous avez besoin d’une sémantique d’ingestion en quasi-temps réel.
Ingestion avec Delta Live Tables
Comme avec les travaux, les pipelines Delta Live Tables peuvent s’exécuter en mode déclenché ou continu. Pour une sémantique de diffusion en continu en quasi-temps réel avec des tables de diffusion en continu, utilisez le mode continu.
Utilisez des tables de diffusion en continu pour configurer l’ingestion par lots incrémentielle ou de diffusion en continu à partir du stockage d’objets cloud, d’Apache Kafka, d’Amazon Brokers, de Google Pub/Sub ou d’Apache Pulsar.
LakeFlow Connect utilise Delta Live Tables pour configurer les pipelines d’ingestion à partir de systèmes connectés. Consultez LakeFlow Connect.
Les vues matérialisées garantissent une sémantique des opérations équivalente aux charges de travail par lots, mais peuvent optimiser de nombreuses opérations pour calculer les résultats de manière incrémentielle. Consultez Opérations d’actualisation pour obtenir des vues matérialisées.
Ingestion avec Databricks SQL
Vous pouvez utiliser des tables de diffusion en continu pour configurer l’ingestion par lots incrémentielle à partir du stockage d’objets cloud, d’Apache Kafka, d’Amazon Brokers, de Google Pub/Sub ou d’Apache Pulsar.
Vous pouvez utiliser des vues matérialisées pour configurer le traitement par lots incrémentiel à partir de sources pouvant être entièrement relues pour un ensemble d’opérations spécifié. Consultez Opérations d’actualisation pour obtenir des vues matérialisées.
COPY INTO
fournit une syntaxe SQL familière pour le traitement par lots incrémentiel des fichiers de données dans le stockage d’objets cloud. Le comportement COPY INTO
est similaire aux modèles pris en charge par les tables de diffusion en continu pour le stockage d’objets cloud, mais tous les paramètres par défaut ne sont pas équivalents pour tous les formats de fichier pris en charge.