다음을 통해 공유


Git 폴더 또는 작업 영역 파일에서 Python 모듈 가져오기

Databricks Git 폴더 또는 작업 영역 파일에 Python 코드를 저장한 다음 해당 Python 코드를 Delta Live Tables 파이프라인으로 가져올 수 있습니다. Git 폴더 또는 작업 영역 파일에서 모듈을 사용하는 방법에 대한 자세한 내용은 Python 및 R 모듈 작업을 참조 하세요.

참고 항목

Databricks Git 폴더 또는 작업 영역 파일에 저장된 Notebook에서 소스 코드를 가져올 수 없습니다. 대신 파이프라인을 만들거나 편집할 때 Notebook을 직접 추가합니다. Delta Live Tables 파이프라인 구성을 참조하세요.

Delta Live Tables 파이프라인으로 Python 모듈 가져오기

다음 예제에서는 작업 영역 파일에서 Python 모듈로 데이터 세트 쿼리를 가져오는 방법을 보여 줍니다. 이 예제에서는 작업 영역 파일을 사용하여 파이프라인 소스 코드를 저장하는 방법을 설명하지만 Git 폴더에 저장된 소스 코드와 함께 사용할 수 있습니다.

이 예제를 실행하려면 다음 단계를 사용합니다.

  1. Azure Databricks 작업 영역의 사이드바에서 작업 영역을 클릭하여 작업 영역 아이콘 작업 영역 브라우저를 엽니다.

  2. 작업 영역 브라우저를 사용하여 Python 모듈의 디렉터리를 선택합니다.

  3. 선택한 디렉터리의 맨 오른쪽 열을 클릭하고 케밥 메뉴 파일 만들기 > 를 클릭합니다.

  4. 파일의 이름을 입력합니다(예: clickstream_raw_module.py.). 파일 편집기가 열립니다. 원본 데이터를 테이블로 읽는 모듈을 만들려면 편집기 창에 다음을 입력합니다.

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  5. 준비된 데이터가 포함된 새 테이블을 만드는 모듈을 만들려면 동일한 디렉터리에 새 파일을 만들고 파일 clickstream_prepared_module.py의 이름을 입력한 다음 새 편집기 창에 다음을 입력합니다.

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  6. 다음으로 파이프라인 Notebook을 만듭니다. Azure Databricks 방문 페이지로 이동하여 Notebook 만들기를 선택하거나 사이드바에서 새로 만들기를 클릭하고 새 아이콘 Notebook을 선택합니다. 작업 영역 브라우저에서 전자 필기장 만들기를 클릭하고 클릭하여 케밥 메뉴 전자 필기장을 만들 > 수도 있습니다.

  7. Notebook 이름을 지정하고 Python이 기본 언어인지 확인합니다.

  8. 만들기를 클릭합니다.

  9. Notebook에 예제 코드를 입력합니다.

    참고 항목

    Notebook이 작업 영역 파일 경로 또는 Notebook 디렉터리와 다른 Git 폴더 경로에서 모듈 또는 패키지를 가져오는 경우 다음을 사용하여 sys.path.append()파일에 경로를 수동으로 추가해야 합니다.

    Git 폴더에서 파일을 가져오는 경우 경로 앞에 추가 /Workspace/ 해야 합니다. 예들 들어 sys.path.append('/Workspace/...')입니다. 경로에서 생략하면 /Workspace/ 오류가 발생합니다.

    모듈 또는 패키지가 Notebook과 동일한 디렉터리에 저장되는 경우 경로를 수동으로 추가할 필요가 없습니다. 또한 루트 디렉터리가 경로에 자동으로 추가되므로 Git 폴더의 루트 디렉터리에서 가져올 때 경로를 수동으로 추가할 필요가 없습니다.

    import sys, os
    # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook.
    sys.path.append(os.path.abspath('<module-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        spark.read.table("LIVE.clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    가져올 Python 모듈이 포함된 디렉터리의 경로로 바꿉 <module-path> 니다.

  10. 새 Notebook을 사용하여 파이프라인을 만듭니다.

  11. 파이프라인을 실행하려면 파이프라인 세부 정보 페이지에서 시작을 클릭합니다.

Python 코드를 패키지로 가져올 수도 있습니다. Delta Live Tables Notebook의 다음 코드 조각은 Notebook과 test_utils 동일한 디렉터리 내의 디렉터리에서 dlt_packages 패키지를 가져옵니다. dlt_packages 디렉터리에는 파일 test_utils.py__init__.py다음 test_utils.py 함수create_test_table()가 정의됩니다.

import dlt

@dlt.table
def my_table():
  return spark.read.table(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)