Condividi tramite


Orchestrare i processi di Azure Databricks con Apache Airflow

Questo articolo descrive il supporto di Apache Airflow per l'orchestrazione di pipeline di dati con Azure Databricks, contiene istruzioni per l'installazione e la configurazione di Airflow a livello locale e fornisce un esempio di distribuzione ed esecuzione di un workflow di Azure Databricks con Airflow.

Orchestrazione dei processi in una pipeline di dati

Lo sviluppo e la distribuzione di una pipeline di elaborazione dati spesso richiedono la gestione di dipendenze complesse tra le attività. Ad esempio, una pipeline potrebbe leggere i dati da un'origine, pulire i dati, trasformare i dati puliti e scrivere i dati trasformati in una destinazione. È inoltre necessario un supporto per il test, la programmazione e la risoluzione degli errori quando si rende operativa una pipeline.

I sistemi di workflow affrontano queste sfide consentendo di definire le dipendenze tra le attività, di programmare l'esecuzione delle pipeline e di monitorare i flussi di lavoro. Apache Airflow è una soluzione open source per la gestione e la pianificazione delle pipeline di dati. Airflow rappresenta le pipeline di dati come grafi aciclici diretti (DAG) di operazioni. Si definisce un flusso di lavoro in un file Python e Airflow ne gestisce la programmazione e l'esecuzione. La connessione Airflow di Azure Databricks consente di sfruttare il motore Spark ottimizzato offerto da Azure Databricks con le funzionalità di pianificazione di Airflow.

Requisiti

  • L'integrazione tra Airflow e Azure Databricks richiede Airflow nella versione 2.5.0 e successive. Gli esempi in questo articolo vengono testati con Airflow nella versione 2.6.1.
  • Airflow richiede Python 3.8, 3.9, 3.10, o 3.11. Gli esempi in questo articolo vengono testati con Python 3.8.
  • Le istruzioni in questo articolo per installare ed eseguire Airflow richiedono pipenv per creare un ambiente virtuale Python.

Operatori Airflow per Databricks

Un DAG Airflow è composto da attività, dove ogni attività esegue un Operatore Airflow. Gli operatori Airflow che supportano l'integrazione con Databricks vengono implementati nel provider Databricks.

Il provider Databricks comprende operatori per l'esecuzione di una serie di attività in un'area di lavoro di Azure Databricks, tra cui l'importazione di dati in una tabella, l'esecuzione di query SQL e il lavoro con le cartelle Git di Databricks.

Il provider Databricks implementa due operatori per l'attivazione dei processi:

Per creare un nuovo processo di Azure Databricks o reimpostare un processo esistente, il provider Databricks implementa DatabricksCreateJobsOperator. Il DatabricksCreateJobsOperator usa le richieste API POST /api/2.1/jobs/create e POST /api/2.1/jobs/reset. È possibile usare DatabricksCreateJobsOperator con DatabricksRunNowOperator per creare ed eseguire un processo.

Nota

Per utilizzare gli operatori di Databricks per attivare un processo è necessario fornire le credenziali nella configurazione della connessione a Databricks. Consultare Creare un token di accesso personale di Azure Databricks per la connessione Airflow.

Gli operatori Databricks Airflow scrivono l'URL della pagina di esecuzione del processo nei log Airflow ogni polling_period_seconds (il valore predefinito è 30 secondi). Per altre informazioni, consultare la pagina del pacchetto apache-airflow-providers-databricks nel sito Web Airflow.

Installare l'integrazione di Airflow di Azure Databricks in locale

Per installare Airflow e il provider Databricks in locale per il test e lo sviluppo, seguire questa procedura. Per altre opzioni di installazione di Airflow, inclusa la creazione di un'installazione di produzione, consultare installazione nella documentazione di Airflow.

Aprire un terminale ed eseguire i comandi seguenti:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Sostituire <firstname>, <lastname>, e <email> con il nome utente e l'e-mail. Verrà richiesto di inserire una password per l'utente amministratore. Assicurarsi di salvare questa password perché è necessaria per accedere all'interfaccia utente di Airflow.

Lo script esegue quindi i passaggi seguenti:

  1. Crea una directory denominata airflow e cambia in tale directory.
  2. Usa pipenv per creare e generare un ambiente virtuale Python. Databricks consiglia di utilizzare un ambiente virtuale Python per isolare le versioni dei pacchetti e le dipendenze del codice in quell'ambiente. Questo isolamento aiuta a ridurre le discrepanze tra le versioni dei pacchetti e le interferenze tra le dipendenze del codice.
  3. Inizializza una variabile di ambiente denominata AIRFLOW_HOME impostata sul percorso della directoryairflow.
  4. Installa Airflow e i pacchetti del provider Airflow Databricks.
  5. Crea una directory airflow/dags. Airflow usa la dags directory per archiviare le definizioni di DAG.
  6. Inizializza un database SQLite usato da Airflow per tenere traccia dei metadati. In una distribuzione Airflow di produzione, si deve configurare Airflow con un database standard. Il database SQLite e la configurazione predefinita per la distribuzione Airflow vengono inizializzati nella directory airflow.
  7. Crea un utente amministratore per Airflow.

Suggerimento

Per confermare l'installazione del provider Databricks, eseguire il seguente comando nella directory di installazione di Airflow:

airflow providers list

Avviare il server Web Airflow e l'utilità di pianificazione

Il server Web Airflow è necessario per visualizzare l'interfaccia utente airflow. Per avviare il server Web, aprire un terminale nella directory di installazione di Airflow ed eseguire i seguenti comandi:

Nota

Se l'avvio del server Web Airflow non riesce a causa di un conflitto di porte, è possibile modificare la porta predefinita nella configurazione di Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

L'utilità di pianificazione è il componente Airflow che pianifica i DAG. Per avviare l’unità di pianificazione, aprire un terminale nella directory di installazione di Airflow ed eseguire i seguenti comandi:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Test dell'installazione di Airflow

Per verificare l'installazione di Airflow, è possibile eseguire uno dei DAG di esempio inclusi in Airflow:

  1. In una finestra del browser aprire http://localhost:8080/home. Accedere all'interfaccia utente di Airflow con il nome utente e la password creati durante l'installazione di Airflow. Viene visualizzata la pagina DaG di Airflow.
  2. Fare clic sull'interruttore DAG Pausa/Riattiva per annullare la sospensione di uno dei gruppi di disponibilità di esempio, ad esempio example_python_operator.
  3. Attivare il DAG di esempio facendo clic sul pulsante Trigger DAG.
  4. Fare clic sul nome del DAG per visualizzare i dettagli, incluso lo stato di esecuzione del DAG.

Creare un token di accesso personale di Azure Databricks per Airflow

Airflow si connette a Databricks usando un token di accesso personale di Azure Databricks (PAT). Per creare un token di accesso personale personale di Azure Databricks, seguire i passaggi per gli utenti dell'area di lavoro.

Nota

Come procedura consigliata per la sicurezza, quando si esegue l'autenticazione con strumenti automatizzati, sistemi, script e app, Databricks consiglia di usare token di accesso personali appartenenti alle entità servizio, anziché agli utenti dell'area di lavoro. Per creare token per le entità servizio, consultare Gestire i token per un'entità servizio.

È anche possibile eseguire l'autenticazione in Azure Databricks usando un token Microsoft Entra ID. Consultare Connessione a Databricks nella documentazione di Airflow.

Configurare una connessione Azure Databricks

L'installazione di Airflow contiene una connessione predefinita per Azure Databricks. Per aggiornare la connessione per connettersi all'area di lavoro usando il token di accesso personale creato in precedenza:

  1. In una finestra del browser aprire http://localhost:8080/connection/list/. Se viene richiesto di effettuare l'accesso, inserire il nome utente e la password dell'amministratore.
  2. In ID Conn, individuare databricks_default e fare clic sul pulsante Modifica record.
  3. Sostituire il valore nel campo Host con il nome dell'istanza dell'area di lavoro della distribuzione di Azure Databricks, ad esempio https://adb-123456789.cloud.databricks.com.
  4. Nel campo Password immettere il token di accesso personale di Azure Databricks.
  5. Fare clic su Salva.

Se si usa un token ID Microsoft Entra, consultare Connessione a Databricks nella documentazione di Airflow per informazioni sulla configurazione dell'autenticazione.

Esempio: Creare un DAG airflow per eseguire un processo di Azure Databricks

Il seguente esempio mostra come creare una semplice distribuzione Airflow che viene eseguita sul computer locale e implementa un DAG di esempio per attivare le esecuzioni in Azure Databricks. Nel seguente esempio, si eseguirà quanto segue:

  1. Creare un nuovo notebook e aggiungere codice per stampare un messaggio di saluto in base a un parametro configurato.
  2. Creare un processo di Azure Databricks con una singola attività che esegue il notebook.
  3. Configurare una connessione Airflow all'area di lavoro di Azure Databricks.
  4. Creare un DAG Airflow per attivare il processo del notebook. Il dag viene definito in uno script Python usando DatabricksRunNowOperator.
  5. Usare l'interfaccia utente Airflow per attivare il DAG e visualizzare lo stato dell'esecuzione.

Creare un notebook

Questo esempio usa un notebook contenente due celle:

  • La prima cella contiene un widget di testo Databricks Utilities che definisce una variabile denominata greeting impostata sul valore predefinito world.
  • La seconda cella stampa il valore della variabile greeting preceduta da hello.

Per creare il notebook:

  1. Passare all'area di lavoro di Azure Databricks, fare clic su Nuova icona Nuovo nella barra laterale e selezionare Notebook.

  2. Assegnare un nome al notebook, ad esempio Hello Airflow, e assicurarsi che il linguaggio predefinito sia impostato su Python.

  3. Copiare il seguente codice Python e incollarlo nella prima cella del notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Aggiungere una nuova cella sotto la prima e copiare e incollare il seguente codice Python nella nuova cella:

    print("hello {}".format(greeting))
    

Creare un processo

  1. Cliccare Icona Flussi di lavoro Flussi di lavoro nella barra laterale.

  2. Cliccare Pulsante Crea processo.

    Viene visualizzata la scheda Attività con la finestra di dialogo Crea attività.

    Finestra di dialogo Crea prima attività

  3. Sostituire Aggiungi un nome per il processo… con il nome del processo.

  4. Nel campo Nome attività immettere un nome per l'attività, ad esempio greeting-task.

  5. Nel menu a discesa Tipo selezionare Notebook.

  6. Nel menu a discesa Origine, selezionare Area di lavoro.

  7. Fare clic sulla casella di testo Percorso e utilizzare il browser dei file per trovare il notebook creato, fare clic sul nome del notebook e poi su Conferma.

  8. Fare clic su Aggiungi in Parametri. Nel campo Chiave immettere greeting. Nel campo Valore immettere Airflow user.

  9. Fare clic su Crea token.

Nel pannello Dettagli processo copiare il valore ID processo. Questo valore è necessario per attivare il processo da Airflow.

Eseguire il processo

Per testare il nuovo processo nell'interfaccia utente dei processi di Azure Databricks, fare clic su Pulsante Esegui ora nell'angolo in alto a destra. Al termine dell'esecuzione, è possibile verificare l'output visualizzando i dettagli dell'esecuzione del processo.

Creare un nuovo DAG Airflow

Si definisce un DAG Airflow in un file Python. Per creare un DAG per attivare il processo del notebook di esempio:

  1. In un editor di testo o in un IDE, create un nuovo file denominato databricks_dag.py con i seguenti contenuti:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Sostituire JOB_ID con il valore dell'ID processo salvato in precedenza.

  2. Salvare questo file nella directory airflow/dags. Airflow legge e installa automaticamente i file DAG archiviati in airflow/dags/.

Installare e verificare il DAG in Airflow

Per attivare e verificare il DAG nell'interfaccia utente Airflow:

  1. In una finestra del browser aprire http://localhost:8080/home. Viene visualizzata la schermata DaG Airflow.
  2. Individuare databricks_dag e fare clic sull'interruttore DAG Pausa/Riattiva per rimuovere il DAG.
  3. Attivare il DAG facendo clic sul pulsante Trigger DAG.
  4. Fare clic su un'esecuzione nella colonna Esecuzioni per visualizzare lo stato e i dettagli dell'esecuzione.