워크플로 오케스트레이션 매니저를 사용하여 기존 파이프라인 실행
적용 대상: Azure Data Factory Azure Synapse Analytics
팁
기업용 올인원 분석 솔루션인 Microsoft Fabric의 Data Factory를 사용해 보세요. Microsoft Fabric은 데이터 이동부터 데이터 과학, 실시간 분석, 비즈니스 인텔리전스 및 보고에 이르기까지 모든 것을 다룹니다. 무료로 새 평가판을 시작하는 방법을 알아봅니다!
참고 항목
워크플로 오케스트레이션 매니저는 Apache Airflow를 통해 구동됩니다.
참고 항목
Azure Data Factory용 워크플로 오케스트레이션 매니저는 오픈 소스 Apache Airflow 애플리케이션을 사용합니다. Airflow에 대한 설명서 및 추가 자습서는 Apache Airflow 설명서 또는 커뮤니티 페이지에서 찾을 수 있습니다.
Data Factory 파이프라인은 확장 가능하고 안정적인 데이터 통합/데이터 흐름을 제공하는 100개 이상의 데이터 원본 커넥터를 제공합니다. Apache Airflow DAG에서 기존 데이터 팩터리 파이프라인을 실행하려는 시나리오가 있습니다. 이 자습서에서는 이 작업을 수행하는 방법을 보여줍니다.
필수 조건
- Azure 구독. Azure 구독이 아직 없는 경우 시작하기 전에 Azure 체험 계정을 만듭니다.
- Azure 스토리지 계정. 스토리지 계정이 없는 경우 Azure Storage 계정 만들기를 참조하세요. 스토리지 계정이 선택한 네트워크에서만 액세스를 허용하는지 확인합니다.
- Azure Data Factory 파이프라인. 아직 없는 경우 자습서에 따라 새 데이터 팩터리 파이프라인을 만들거나 시작 및 첫 번째 데이터 팩터리 파이프라인 사용해 보기에서 만들 수도 있습니다.
- 서비스 주체 설정. 워크플로 오케스트레이션 관리자 환경과 파이프라인이 동일한 데이터 팩터리에 있는 경우에도 새 서비스 주체를 만들거나 기존 서비스 주체를 사용하고 파이프라인(예: 기존 파이프라인이 있는 데이터 팩터리의 기여자 역할)을 실행할 수 있는 권한을 부여해야 합니다. 서비스 주체의 클라이언트 ID 및 클라이언트 암호(API 키)를 가져와야 합니다.
단계
아래 내용을 사용하여 새 Python 파일 adf.py를 만듭니다.
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
워크플로 오케스트레이션 관리자 UI 관리자 - 연결 -> '+' ->> '연결 유형'을 'Azure Data Factory'로 선택한 다음, client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name 및 pipeline_name 입력해야 합니다.
DAGS라는 폴더 내에서 Blob Storage에 adf.py 파일을 업로드합니다.
DAGS 폴더를 워크플로 오케스트레이션 관리자 환경으로 가져옵니다. 계정이 없는 경우 새 계정을 만듭니다.