Eseguire una pipeline esistente con gestore dell'orchestrazione del flusso di lavoro
SI APPLICA A: Azure Data Factory Azure Synapse Analytics
Suggerimento
Provare Data Factory in Microsoft Fabric, una soluzione di analisi all-in-one per le aziende. Microsoft Fabric copre tutto, dallo spostamento dati al data science, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Vedere le informazioni su come iniziare una nuova prova gratuita!
Nota
Il gestore dell'orchestrazione del flusso di lavoro dispone di tecnologia Apache Airflow.
Nota
Gestore dell'orchestrazione del flusso di lavoro per Azure Data Factory si basa sull'applicazione Apache Airflow open source. La documentazione e altre esercitazioni per Airflow sono disponibili nelle pagine della documentazione o della community di Apache Airflow.
Le pipeline di Data Factory offrono oltre 100 connettori di origine dati che offrono flussi di dati/integrazione dati scalabili e affidabili. Esistono scenari in cui si vuole eseguire una pipeline di data factory esistente dal daG Apache Airflow. Questa esercitazione illustra come eseguire questa operazione.
Prerequisiti
- Sottoscrizione di Azure. Se non si ha una sottoscrizione di Azure, creare un account Azure gratuito prima di iniziare.
- Account di archiviazione di Azure. Se non si ha un account di archiviazione, vedere Creare un account di archiviazione di Azure per informazioni su come crearne uno. Assicurarsi che l'account di archiviazione consenta l'accesso solo da reti selezionate.
- Pipeline di Azure Data Factory. È possibile seguire una qualsiasi delle esercitazioni e creare una nuova pipeline di data factory nel caso in cui non ne sia già disponibile una o crearne una con una selezione in Introduzione e provare la prima pipeline della data factory.
- Configurare un'entità servizio. Sarà necessario creare una nuova entità servizio o usarne una esistente e concedergli l'autorizzazione per eseguire la pipeline (ad esempio il ruolo collaboratore nella data factory in cui esistono le pipeline esistenti), anche se l'ambiente di Workflow Orchestration Manager e le pipeline esistono nella stessa data factory. Sarà necessario ottenere l'ID client dell'entità servizio e il segreto client (chiave API).
Passaggi
Creare un nuovo file Python adf.py con il contenuto seguente:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Sarà necessario creare la connessione usando l'amministratore dell'interfaccia utente di Workflow Orchestration Manager -> Connessioni -> '+' -> Scegliere 'Tipo di connessione' come 'Azure Data Factory', quindi compilare il client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name e pipeline_name.
Caricare il file adf.py nell'archivio BLOB all'interno di una cartella denominata DAGS.
Importare la cartella DAGS nell'ambiente workflow Orchestration Manager. Se non ne hai uno, crearne uno nuovo