Partager via


Exécuter un pipeline Delta Live Tables dans un workflow

Vous pouvez exécuter un pipeline Delta Live Tables dans un workflow de traitement des données en utilisant des travaux Databricks, Apache Airflow ou Azure Data Factory.

travaux

Vous pouvez orchestrer plusieurs tâches dans un travail Databricks en vue d’implémenter un workflow de traitement des données. Pour inclure un pipeline Delta Live Tables dans un travail, utilisez la tâche Pipeline au moment de la création d’un projet. Consultez Tâche du pipeline Delta Live Tables pour les projets.

Apache Airflow

Apache Airflow est une solution open source conçue pour la gestion et la planification des workflows de données. Airflow représente les workflows sous forme de graphes orientés acycliques (DAG) des opérations. Vous définissez un workflow dans un fichier Python, puis Airflow gère la planification et l’exécution. Pour plus d’informations sur l’installation et l’utilisation d’Airflow avec Azure Databricks, consultez Orchestrer des travaux Azure Databricks avec Apache Airflow.

Pour exécuter un pipeline Delta Live Tables dans le cadre d’un workflow Airflow, utilisez DatabricksSubmitRunOperator.

Spécifications

Pour permettre la prise en charge de Delta Live Tables par Airflow, vous avez besoin des éléments suivants :

Exemple

L’exemple suivant crée un DAG Airflow qui déclenche une mise à jour du pipeline Delta Live Tables ayant l’identificateur 8279d543-063c-4d63-9926-dae38e35ce8b :

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

Remplacez CONNECTION_ID par l’identificateur d’une connexion Airflow à votre espace de travail.

Enregistrez cet exemple dans le répertoire airflow/dags et utilisez l’interface utilisateur Airflow pour afficher et déclencher le DAG. Utilisez l’interface utilisateur Delta Live Tables pour afficher les détails de la mise à jour du pipeline.

Azure Data Factory

Azure Data Factory est un service ETL dans le cloud qui vous permet d’orchestrer les workflows d’intégration et de transformation des données. Azure Data Factory prend directement en charge l’exécution de tâches Azure Databricks dans un workflow, y compris les notebooks, les tâches JAR et les scripts Python. Vous pouvez également inclure un pipeline dans un workflow en appelant l’API Delta Live Tables à partir d’une activité WebAzure Data Factory. Par exemple, pour déclencher une mise à jour du pipeline à partir d’Azure Data Factory :

  1. Créez une fabrique de données ou ouvrez une fabrique de données existante.

  2. Une fois la création terminée, ouvrez la page de votre fabrique de données, puis cliquez sur la vignette Ouvrir Azure Data Factory Studio. L’interface utilisateur Azure Data Factory s’affiche.

  3. Créez un pipeline Azure Data Factory en sélectionnant Pipeline dans le menu déroulant Nouveau de l’interface utilisateur Azure Data Factory Studio.

  4. Dans la boîte à outils Activités, développez Général, puis glissez-déposez l’activité Web vers le canevas du pipeline. Cliquez sur l’onglet Paramètres et entrez les valeurs suivantes :

    Remarque

    En guise de bonne pratique de sécurité, quand vous vous authentifiez avec des outils, systèmes, scripts et applications automatisés, Databricks recommande d’utiliser des jetons d’accès personnels appartenant à des principaux de service et non des utilisateurs de l’espace de travail. Pour créer des jetons d’accès pour des principaux de service, consultez la section Gérer les jetons pour un principal de service.

    • URL : https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates.

      Remplacez <get-workspace-instance>.

      Remplacez <pipeline-id> par l’identificateur du pipeline.

    • Méthode : sélectionnez POST dans le menu déroulant.

    • En-têtes : cliquez sur + Nouveau. Dans la zone de texte Nom, entrez Authorization. Dans la zone de texte Valeur, entrez Bearer <personal-access-token>.

      Remplacez <personal-access-token> par un jeton d’accès personnel Azure Databricks.

    • Corps : pour passer des paramètres de requête supplémentaires, entrez un document JSON contenant les paramètres. Par exemple, pour démarrer une mise à jour et retraiter toutes les données du pipeline, entrez : {"full_refresh": "true"}. S’il n’y a pas de paramètres de requête supplémentaires à passer, entrez des accolades vides ({}).

Pour tester l’activité Web, cliquez sur Déboguer dans la barre d’outils du pipeline dans l’interface utilisateur Data Factory. La sortie et l’état de l’exécution, y compris les erreurs, sont affichés sous l’onglet Sortie du pipeline Azure Data Factory. Utilisez l’interface utilisateur Delta Live Tables pour afficher les détails de la mise à jour du pipeline.

Conseil

Pour les workflows, une exigence courante est de démarrer une tâche après l’achèvement d’une tâche précédente. Étant donné que la requête updates de Delta Live Tables est asynchrone, elle retourne le résultat après le démarrage de la mise à jour, mais avant la fin de la mise à jour : les tâches dans votre pipeline Azure Data Factory qui ont une dépendance avec la mise à jour de Delta Live Tables doivent donc attendre la fin de la mise à jour. Vous pouvez spécifier l’attente de la fin de la mise à jour en ajoutant une activité Until (Jusqu’à) après l’activité Web qui déclenche la mise à jour de Delta Live Tables. Dans l’activité Until :

  1. Ajoutez une activité Wait (Attendre) pour attendre pendant le nombre de secondes configuré la fin de la mise à jour.
  2. Ajoutez une activité Web après l’activité Wait qui utilise la requête d’obtention des détails de la mise à jour de Delta Live Tables pour connaître l’état de la mise à jour. Le champ state dans la réponse retourne l’état actuel de la mise à jour, y compris si elle est terminée.
  3. Utilisez la valeur du champ state pour définir la condition de fin de l’activité Until. Vous pouvez également utiliser une activité Set Variable (Définir une variable) pour ajouter une variable de pipeline basée sur la valeur de state et utiliser cette variable pour la condition de fin.