자습서: Spark SQL을 사용하여 COPY INTO
Databricks는 수천 개의 파일이 포함된 데이터 원본에 대한 증분 및 대량 데이터 로드에 COPY INTO 명령을 사용할 것을 권장합니다. Databricks는 고급 사용 사례에 자동 로더를 사용하는 것이 좋습니다.
이 자습서에서는 COPY INTO
명령을 사용하여 클라우드 개체 스토리지의 데이터를 Azure Databricks 작업 영역의 테이블로 로드합니다.
요구 사항
- Azure 구독, 해당 구독의 Azure Databricks 작업 영역 및 해당 작업 영역의 클러스터가 필요합니다. 이러한 항목을 만들려면 빠른 시작: Azure Portal을 사용하여 Azure Databricks 작업 영역에서 Spark 작업 실행을 참조하세요. 이 빠른 시작을 따르는 경우 Spark SQL 작업 실행 섹션의 지침을 따를 필요가 없습니다.
- Databricks Runtime 11.3 LTS 이상을 실행하는 작업 영역의 다목적 클러스터 입니다. 다목적 클러스터를 만들려면 Compute 구성 참조를 참조하세요.
- Azure Databricks 작업 영역 사용자 인터페이스에 대해 잘 알고 있어야 합니다. 작업 영역 탐색을 참조하세요.
- Databricks Notebook을 사용하는 데 익숙합니다.
- 데이터를 쓸 수 있는 위치, 이 데모에서는 DBFS 루트를 예로 사용하지만 Databricks는 Unity 카탈로그로 구성된 외부 스토리지 위치를 권장합니다.
1단계. 환경 구성 및 데이터 생성기 만들기
이 자습서에서는 Azure Databricks 및 기본 작업 영역 구성에 대한 기본적인 지식이 있다고 가정합니다. 제공된 코드를 실행할 수 없는 경우 작업 영역 관리자에게 문의하여 컴퓨팅 리소스와 데이터를 쓸 수 있는 위치에 액세스할 수 있는지 확인합니다.
제공된 코드는 source
매개 변수를 사용하여 COPY INTO
데이터 원본으로 구성할 위치를 지정합니다. 작성된 대로 이 코드는 DBFS 루트의 위치를 가리킵니다. 외부 개체 스토리지 위치에 대한 쓰기 권한이 있는 경우 원본 문자열의 dbfs:/
부분을 개체 스토리지에 대한 경로로 바꿉니다. 이 코드 블록은 이 데모를 다시 설정하기 위해 재귀 삭제도 수행하므로 프로덕션 데이터를 가리키지 않도록 하고 기존 데이터를 덮어쓰거나 삭제하지 않도록 /user/{username}/copy-into-demo
중첩 디렉터리를 유지해야 합니다.
새 SQL Notebook을 만들고 Databricks Runtime 11.3 LTS 이상을 실행하는 클러스터에 연결합니다.
다음 코드를 복사하고 실행하여 이 자습서에 사용된 스토리지 위치와 데이터베이스를 다시 설정합니다.
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
다음 코드를 복사하고 실행하여 데이터를 임의로 생성하는 데 사용할 일부 테이블과 함수를 구성합니다.
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
2단계: 샘플 데이터를 클라우드 스토리지에 쓰기
Azure Databricks에서는 Delta Lake 이외의 데이터 서식에 자주 쓰지 않습니다. 여기에 제공된 코드는 JSON에 작성하여 다른 시스템의 결과를 개체 스토리지로 덤프할 수 있는 외부 시스템을 시뮬레이션합니다.
다음 코드를 복사하고 실행하여 원시 JSON 데이터 일괄 처리를 작성합니다.
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
3단계: COPY INTO를 사용하여 JSON 데이터를 멱등적으로 로드
COPY INTO
를 사용하려면 먼저 대상 Delta Lake 테이블을 만들어야 합니다. Databricks Runtime 11.3 LTS 이상에서는 문에 CREATE TABLE
테이블 이름 이외의 다른 항목을 제공할 필요가 없습니다. 이전 버전의 Databricks Runtime의 경우 빈 테이블을 만들 때 스키마를 제공해야 합니다.
다음 코드를 복사하고 실행하여 대상 Delta 테이블을 만들고 원본에서 데이터를 로드합니다.
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
이 작업은 멱등적이므로 여러 번 실행할 수 있지만 데이터는 한 번만 로드됩니다.
4단계: 테이블 내용 미리 보기
간단한 SQL 쿼리를 실행하여 이 테이블의 내용을 수동으로 검토할 수 있습니다.
다음 코드를 복사하고 실행하여 테이블을 미리 봅니다.
-- Review updated table SELECT * FROM user_ping_target
5단계: 추가 데이터 로드 및 결과 미리 보기
2-4단계를 여러 번 다시 실행하여 원본에 임의의 원시 JSON 데이터의 새 일괄 처리를 배치하고, COPY INTO
를 사용하여 이를 Delta Lake에 멱등적으로 로드하고, 결과를 미리 볼 수 있습니다. 이 단계를 순서 없이 실행하거나 여러 번 실행하여 새 데이터가 도착하지 않은 상태에서 COPY INTO
를 여러 번 실행했거나 쓰인 원시 데이터에 대한 일괄 처리를 여러 번 시뮬레이션해 보세요.
6단계: 자습서 정리
이 자습서를 마친 후 더 이상 유지하지 않으려면 연결된 리소스를 정리할 수 있습니다.
다음 코드를 복사하여 실행하여 데이터베이스, 테이블을 삭제하고 모든 데이터를 제거합니다.
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
컴퓨팅 리소스를 중지하려면 클러스터 탭으로 이동하여 클러스터를 종료합니다.
추가 리소스
- COPY INTO 참조 문서