다음을 통해 공유


Apache Airflow로 Azure Databricks 작업 조정

이 문서에서는 Azure Databricks를 사용하여 데이터 파이프라인을 조정하기 위한 Apache Airflow 지원을 설명하고, Airflow를 로컬에 설치하고 구성하는 방법에 대한 지침을 제공하며, Airflow를 사용하여 Azure Databricks 워크플로를 배포하고 실행하는 예를 제공합니다.

데이터 파이프라인의 작업 오케스트레이션

데이터 처리 파이프라인을 개발하고 배포하려면 태스크 간의 복잡한 종속성을 관리해야 하는 경우가 많습니다. 예를 들어 파이프라인은 원본에서 데이터를 읽고, 데이터를 정리하고, 정리된 데이터를 변환하고, 변환된 데이터를 대상에 쓸 수 있습니다. 파이프라인을 운영할 때는 테스트, 일정 관리, 오류 문제 해결에 대한 지원도 필요합니다.

워크플로 시스템은 작업 간의 종속성을 정의하고, 파이프라인이 실행되는 시기를 예약하고, 워크플로를 모니터링할 수 있도록 하여 이러한 문제를 해결합니다. Apache Airflow는 데이터 파이프라인을 관리하고 예약하기 위한 오픈 소스 솔루션입니다. Airflow는 데이터 파이프라인을 작업의 지시된 DAG(방향성 비순환 그래프)로 나타냅니다. Python 파일에서 워크플로를 정의하고 Airflow는 예약 및 실행을 관리합니다. Airflow Azure Databricks 연결을 사용하면 Airflow의 예약 기능을 통해 Azure Databricks에서 제공하는 최적화된 Spark 엔진을 활용할 수 있습니다.

요구 사항

  • Airflow와 Azure Databricks를 통합하려면 Airflow 버전 2.5.0 이상이 필요합니다. 이 문서의 예제는 Airflow 버전 2.6.1으로 테스트됩니다.
  • Airflow는 Python 3.8, 3.9, 3.10 또는 3.11이 필요합니다. 이 문서의 예제는 Python 3.8을 사용하여 테스트됩니다.
  • 이 문서에서는 Airflow를 설치하고 실행하는 방법을 설명하며, 이를 위해서는 pipenv를 사용하여 Python 가상 환경을 만들어야 합니다.

Databricks에 대한 Airflow 연산자

Airflow DAG는 여러 작업으로 구성되며, 각 작업은 Airflow 연산자를 실행합니다. Databricks와의 통합을 지원하는 Airflow 연산자는 Databricks 공급자에 구현되어 있습니다.

Databricks 공급자에는 Azure Databricks 작업 영역에 대해 다양한 작업을 실행하는 연산자가 포함되어 있습니다. 여기에는 테이블에 데이터 가져오기, SQL 쿼리 실행, Databricks Git 폴더 작업 등이 포함됩니다.

Databricks 공급자는 작업을 트리거하기 위해 두 가지 연산자를 구현합니다.

  • DatabricksRunNowOperator에는 기존 Azure Databricks 작업이 필요하고 POST /api/2.1/jobs/run-now API 요청을 사용하여 실행을 트리거합니다. Databricks는 작업 정의의 중복을 줄이고 이 연산자를 사용하여 트리거된 작업 실행은 작업 UI에서 찾을 수 있으므로 DatabricksRunNowOperator을(를) 사용하는 것이 좋습니다.
  • DatabricksSubmitRunOperator는 Azure Databricks에 작업이 필요하지 않으며 만들기 및 POST /api/2.1/jobs/runs/submit API 요청을 사용하여 작업 사양을 제출하고 실행을 트리거합니다.

새로운 Azure Databricks 작업을 만들거나 기존 작업을 재설정하기 위해 Databricks 공급자는 DatabricksCreateJobsOperator를 구현합니다. DatabricksCreateJobsOperator에서는 POST /api/2.1/jobs/createPOST /api/2.1/jobs/reset API 요청을 사용합니다. DatabricksCreateJobsOperator을(를) DatabricksRunNowOperator과(와) 함께 사용하면 작업을 만들고 실행할 수 있습니다.

참고 항목

Databricks 연산자를 사용하여 작업을 트리거하려면 Databricks 연결 구성에서 자격 증명을 제공해야 합니다. Airflow에 대한 Azure Databricks 개인용 액세스 토큰 만들기를 참조하세요.

Databricks Airflow 연산자는 작업 실행 페이지 URL을 Airflow 로그에 polling_period_seconds마다 씁니다(기본값은 30초). 자세한 내용은 Airflow 웹 사이트의 apache-airflow-providers-databricks 패키지 페이지를 참조하세요.

로컬에서 Airflow Azure Databricks 통합 설치

테스트 및 개발을 위해 Airflow와 Databricks 공급자를 로컬에 설치하려면 다음 단계를 따르세요. 프로덕션 설치 생성을 포함한 기타 Airflow 설치 옵션에 대한 자세한 내용은 Airflow 설명서의 설치를 참조하세요.

터미널을 열고 다음 명령을 실행합니다.

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

<firstname>, <lastname><email>을 사용자 이름 및 이메일로 바꿉니다. 관리 사용자의 비밀번호를 입력하라는 메시지가 표시됩니다. Airflow UI에 로그인하는 데 필요하므로 이 비밀번호를 꼭 저장해 두세요.

이 스크립트는 다음 단계를 수행합니다.

  1. 이름이 지정된 airflow 디렉터리를 만들고 해당 디렉터리로 변경합니다.
  2. pipenv을 사용하여 Python 가상 환경을 만들고 생성하는 데 사용합니다. Databricks는 Python 가상 환경을 사용하여 패키지 버전 및 코드 종속성을 해당 환경으로 격리하는 것이 좋습니다. 이러한 격리는 예기치 않은 패키지 버전 불일치 및 코드 종속성 충돌을 줄이는 데 도움이 됩니다.
  3. airflow 디렉터리의 경로로 설정된 환경 변수 AIRFLOW_HOME을(를) 초기화합니다.
  4. Airflow 및 Airflow Databricks 공급자 패키지를 설치합니다.
  5. airflow/dags 디렉터리를 만듭니다. Airflow는 dags 디렉터리를 사용하여 DAG 정의를 저장합니다.
  6. Airflow에서 메타데이터를 추적하는 데 사용하는 SQLite 데이터베이스를 초기화합니다. 프로덕션 Airflow 배포에서는 표준 데이터베이스를 사용하여 Airflow를 구성합니다. Airflow 배포에 대한 SQLite 데이터베이스 및 기본 구성이 airflow 디렉터리에서 초기화됩니다.
  7. Airflow에 대한 관리 사용자를 만듭니다.

Databricks 공급자가 설치되었는지 확인하려면 Airflow 설치 디렉터리에서 다음 명령을 실행하세요.

airflow providers list

Airflow 웹 서버 및 스케줄러 시작

Airflow UI를 보려면 Airflow 웹 서버가 필요합니다. 웹 서버를 시작하려면 Airflow 설치 디렉터리에서 터미널을 열고 다음 명령을 실행하세요.

참고 항목

포트 충돌로 인해 Airflow 웹 서버가 시작되지 않는 경우 Airflow 구성에서 기본 포트를 변경할 수 있습니다.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

스케줄러는 DAG를 예약하는 Airflow 구성 요소입니다. 스케줄러를 시작하려면 Airflow 설치 디렉터리에서 새 터미널을 열고 다음 명령을 실행하세요.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Airflow 설치 테스트

Airflow 설치를 확인하려면 Airflow에 포함된 예제 DAG 중 하나를 실행할 수 있습니다.

  1. 브라우저 창에서 http://localhost:8080/home을(를) 엽니다. Airflow를 설치할 때 생성한 사용자 이름과 비밀번호로 Airflow UI에 로그인합니다. Airflow DAG 페이지가 나타납니다.
  2. DAG 일시 중지/일시 중지 해제 토글을 클릭하여 예제 DAG 중 하나를 일시 중지 해제합니다(예example_python_operator).
  3. DAG 트리거 단추를 클릭하여 예제 DAG를 트리거합니다.
  4. DAG의 실행 상태를 포함하여 세부 정보를 보려면 DAG 이름을 클릭합니다.

Airflow에 대한 Azure Databricks 개인용 액세스 토큰 만들기

Airflow는 Azure Databricks PAT(개인용 액세스 토큰)를 사용하여 Databricks에 연결합니다. PAT를 만들려면 작업 영역 사용자를 위한 Azure Databricks 개인용 액세스 토큰의 단계를 따릅니다.

참고 항목

보안 모범 사례로, 자동화된 도구, 시스템, 스크립트, 앱을 사용하여 인증할 때 Databricks는 작업 영역 사용자 대신 서비스 주체에 속한 개인용 액세스 토큰을 사용하는 것을 권장합니다. 서비스 주체에 대한 토큰을 만들려면 서비스 주체에 대한 토큰 관리를 참조하세요.

Microsoft Entra ID 토큰을 사용하여 Azure Databricks에 인증할 수도 있습니다. Airflow 설명서에서 Databricks 연결을 참조하세요.

Azure Databricks 연결 구성

Airflow 설치에는 Azure Databricks에 대한 기본 연결이 포함되어 있습니다. 위에서 만든 개인용 액세스 토큰을 사용하여 작업 영역에 연결하도록 연결을 업데이트하려면 다음을 수행합니다.

  1. 브라우저 창에서 http://localhost:8080/connection/list/을(를) 엽니다. 로그인하라는 메시지가 표시되면 관리 사용자 이름과 비밀번호를 입력하세요.
  2. Conn ID에서 databricks_default를 찾아 레코드 편집 단추를 클릭합니다.
  3. 호스트 필드의 값을 Azure 데이터브릭스 배포의 작업 영역 인스턴스 이름(예: https://adb-123456789.cloud.databricks.com)으로 바꿉니다.
  4. 비밀번호 필드에 Azure Databricks 개인용 액세스 토큰을 입력합니다.
  5. 저장을 클릭합니다.

Microsoft Entra ID 토큰을 사용하는 경우 Airflow 설명서의 Databricks Connection에서 인증 구성에 대한 자세한 내용을 참조하세요.

예: Azure Databricks 작업을 실행하기 위한 Airflow DAG 만들기

다음 예제에서는 로컬 컴퓨터에서 실행되고 Azure Databricks에서 실행을 트리거하는 예제 DAG를 배포하는 간단한 Airflow 배포를 만드는 방법을 보여 줍니다. 이 예에서는 다음을 수행합니다.

  1. 새 Notebook을 만들고 구성된 매개 변수를 기반으로 인사말을 인쇄하는 코드를 추가합니다.
  2. Notebook을 실행하는 단일 작업을 사용하여 Azure Databricks 작업을 만듭니다.
  3. Azure Databricks 작업 영역에 대한 Airflow 연결을 구성합니다.
  4. Airflow DAG를 만들어 Notebook 작업을 트리거합니다. DatabricksRunNowOperator을(를) 사용하여 Python 스크립트에서 DAG를 정의합니다.
  5. Airflow UI를 사용하여 DAG를 트리거하고 실행 상태를 확인합니다.

Notebook 만들기

이 예제에서는 두 개의 셀이 포함된 Notebook을 사용합니다.

  • 첫 번째 셀에는 기본값 world(으)로 설정된 변수 greeting을(를) 정의하는 Databricks Utilities 텍스트 위젯이 포함되어 있습니다.
  • 두 번째 셀은 접두사 hello로 된 greeting 변수의 값을 인쇄합니다.

Notebook을 만들려면 다음을 수행합니다.

  1. Azure Databricks 작업 영역으로 이동하여 사이드바에서 새 아이콘 새로 만들기를 클릭하고 Notebook을 선택합니다.

  2. Notebook에 Hello Airflow와 같은 이름을 지정하고 기본 언어가 Python으로 설정되어 있는지 확인하세요.

  3. 다음 Python 코드를 복사하여 Notebook의 첫 번째 셀에 붙여넣습니다.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. 첫 번째 셀 아래에 새 셀을 추가하고 다음 Python 코드를 복사하여 새 셀에 붙여넣습니다.

    print("hello {}".format(greeting))
    

작업 만들기

  1. 사이드바에서 워크플로 아이콘 워크플로를 클릭합니다.

  2. 작업 만들기 버튼을 클릭합니다.

    작업 탭에 작업 만들기 대화 상자가 표시됩니다.

    첫 번째 작업 만들기 대화 상자

  3. 작업 이름 추가...를 작업 이름으로 바꿉니다.

  4. 작업 이름 필드에 작업의 이름(예: greeting-task)을 입력합니다.

  5. 형식 드롭다운 메뉴에서 Notebook을 선택합니다.

  6. 원본 드롭다운 메뉴에서 작업 영역을 선택합니다.

  7. 경로 텍스트 상자를 클릭하고 파일 브라우저를 사용하여 생성한 Notebook을 찾은 다음, Notebook 이름을 클릭하고 확인을 클릭합니다.

  8. 매개 변수 아래에서 추가를 클릭합니다. 필드에 greeting을(를) 입력합니다. 필드에 Airflow user를 입력합니다.

  9. 작업 만들기를 클릭합니다.

작업 세부 정보 패널에서 작업 ID 값을 복사합니다. 이 값은 Airflow에서 작업을 트리거하는 데 필요합니다.

작업 실행

Azure Databricks 작업 UI에서 새 작업을 테스트하려면 오른쪽 위 모서리에 있는 지금 실행 버튼을(를) 클릭합니다. 실행이 완료되면 작업 실행 세부 정보를 보고 출력을 확인할 수 있습니다.

새 Airflow DAG 만들기

Python 파일에서 Airflow DAG를 정의합니다. 예제 Notebook 작업을 트리거하는 DAG를 만들려면 다음을 수행합니다.

  1. 텍스트 편집기 또는 IDE에서 다음 내용으로 databricks_dag.py라고 명명된 새 파일을 만듭니다.

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    JOB_ID을(를) 이전에 저장한 작업 ID의 값으로 바꿉니다.

  2. 파일을 airflow/dags 디렉터리에 저장합니다. Airflow는 자동으로 airflow/dags/에 저장된 DAG 파일을 읽고 설치합니다.

Airflow에서 DAG 설치 및 확인

Airflow UI에서 DAG를 트리거하고 확인하려면 다음을 수행합니다.

  1. 브라우저 창에서 http://localhost:8080/home을(를) 엽니다. Airflow DAG 화면이 나타납니다.
  2. databricks_dag을(를) 찾아 DAG 일시 중지/일시 중지 해제 토글을 클릭하여 DAG를 일시 중지 해제합니다.
  3. DAG 트리거 단추를 클릭하여 DAG를 트리거합니다.
  4. 실행 열에서 실행을 클릭하여 실행의 상태 및 세부 정보를 확인합니다.