다음을 통해 공유


자습서: Spark SQL을 사용하여 COPY INTO

Databricks는 수천 개의 파일이 포함된 데이터 원본에 대한 증분 및 대량 데이터 로드에 COPY INTO 명령을 사용할 것을 권장합니다. Databricks는 고급 사용 사례에 자동 로더를 사용하는 것이 좋습니다.

이 자습서에서는 COPY INTO 명령을 사용하여 클라우드 개체 스토리지의 데이터를 Azure Databricks 작업 영역의 테이블로 로드합니다.

요구 사항

  1. Azure 구독, 해당 구독의 Azure Databricks 작업 영역 및 해당 작업 영역의 클러스터가 필요합니다. 이러한 항목을 만들려면 빠른 시작: Azure Portal을 사용하여 Azure Databricks 작업 영역에서 Spark 작업 실행을 참조하세요. 이 빠른 시작을 따르는 경우 Spark SQL 작업 실행 섹션의 지침을 따를 필요가 없습니다.
  2. Databricks Runtime 11.3 LTS 이상을 실행하는 작업 영역의 다목적 클러스터 입니다. 다목적 클러스터를 만들려면 Compute 구성 참조를 참조하세요.
  3. Azure Databricks 작업 영역 사용자 인터페이스에 대해 잘 알고 있어야 합니다. 작업 영역 탐색을 참조하세요.
  4. Databricks Notebook을 사용하는 데 익숙합니다.
  5. 데이터를 쓸 수 있는 위치, 이 데모에서는 DBFS 루트를 예로 사용하지만 Databricks는 Unity 카탈로그로 구성된 외부 스토리지 위치를 권장합니다.

1단계. 환경 구성 및 데이터 생성기 만들기

이 자습서에서는 Azure Databricks 및 기본 작업 영역 구성에 대한 기본적인 지식이 있다고 가정합니다. 제공된 코드를 실행할 수 없는 경우 작업 영역 관리자에게 문의하여 컴퓨팅 리소스와 데이터를 쓸 수 있는 위치에 액세스할 수 있는지 확인합니다.

제공된 코드는 source 매개 변수를 사용하여 COPY INTO 데이터 원본으로 구성할 위치를 지정합니다. 작성된 대로 이 코드는 DBFS 루트의 위치를 가리킵니다. 외부 개체 스토리지 위치에 대한 쓰기 권한이 있는 경우 원본 문자열의 dbfs:/ 부분을 개체 스토리지에 대한 경로로 바꿉니다. 이 코드 블록은 이 데모를 다시 설정하기 위해 재귀 삭제도 수행하므로 프로덕션 데이터를 가리키지 않도록 하고 기존 데이터를 덮어쓰거나 삭제하지 않도록 /user/{username}/copy-into-demo 중첩 디렉터리를 유지해야 합니다.

  1. 새 SQL Notebook을 만들고 Databricks Runtime 11.3 LTS 이상을 실행하는 클러스터에 연결합니다.

  2. 다음 코드를 복사하고 실행하여 이 자습서에 사용된 스토리지 위치와 데이터베이스를 다시 설정합니다.

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. 다음 코드를 복사하고 실행하여 데이터를 임의로 생성하는 데 사용할 일부 테이블과 함수를 구성합니다.

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

2단계: 샘플 데이터를 클라우드 스토리지에 쓰기

Azure Databricks에서는 Delta Lake 이외의 데이터 서식에 자주 쓰지 않습니다. 여기에 제공된 코드는 JSON에 작성하여 다른 시스템의 결과를 개체 스토리지로 덤프할 수 있는 외부 시스템을 시뮬레이션합니다.

  1. 다음 코드를 복사하고 실행하여 원시 JSON 데이터 일괄 처리를 작성합니다.

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

3단계: COPY INTO를 사용하여 JSON 데이터를 멱등적으로 로드

COPY INTO를 사용하려면 먼저 대상 Delta Lake 테이블을 만들어야 합니다. Databricks Runtime 11.3 LTS 이상에서는 문에 CREATE TABLE 테이블 이름 이외의 다른 항목을 제공할 필요가 없습니다. 이전 버전의 Databricks Runtime의 경우 빈 테이블을 만들 때 스키마를 제공해야 합니다.

  1. 다음 코드를 복사하고 실행하여 대상 Delta 테이블을 만들고 원본에서 데이터를 로드합니다.

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

이 작업은 멱등적이므로 여러 번 실행할 수 있지만 데이터는 한 번만 로드됩니다.

4단계: 테이블 내용 미리 보기

간단한 SQL 쿼리를 실행하여 이 테이블의 내용을 수동으로 검토할 수 있습니다.

  1. 다음 코드를 복사하고 실행하여 테이블을 미리 봅니다.

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

5단계: 추가 데이터 로드 및 결과 미리 보기

2-4단계를 여러 번 다시 실행하여 원본에 임의의 원시 JSON 데이터의 새 일괄 처리를 배치하고, COPY INTO를 사용하여 이를 Delta Lake에 멱등적으로 로드하고, 결과를 미리 볼 수 있습니다. 이 단계를 순서 없이 실행하거나 여러 번 실행하여 새 데이터가 도착하지 않은 상태에서 COPY INTO를 여러 번 실행했거나 쓰인 원시 데이터에 대한 일괄 처리를 여러 번 시뮬레이션해 보세요.

6단계: 자습서 정리

이 자습서를 마친 후 더 이상 유지하지 않으려면 연결된 리소스를 정리할 수 있습니다.

  1. 다음 코드를 복사하여 실행하여 데이터베이스, 테이블을 삭제하고 모든 데이터를 제거합니다.

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 컴퓨팅 리소스를 중지하려면 클러스터 탭으로 이동하여 클러스터를 종료합니다.

추가 리소스