다음을 통해 공유


Get COPY INTO 사용하여 데이터를 로드하기 시작했습니다.

COPY INTO SQL 명령을 사용하면 파일 위치에서 델타 테이블 table로 데이터를 로드할 수 있습니다. 이는 재시도 가능하며 멱등성인 작업입니다. 이미 로드된 원본 위치의 파일은 건너뜁니다.

COPY INTO 에서는 다음과 같은 기능을 제공합니다.

  • S3, ADLS Gen2, ABFS, GCS 및 Unity Catalogvolumes포함하여 클라우드 스토리지에서 쉽게 구성할 수 있는 파일 또는 디렉터리 필터입니다.
  • CSV, JSON, XML, Avro, ORC, Parquet, 텍스트 및 이진 파일 등 여러 소스 파일 형식 지원
  • 기본적으로 정확히 한 번(idempotent) 파일 처리
  • 타깃 tableschema 추론, 매핑, 병합 및 진화

참고 항목

더 확장 가능하고 강력한 파일 수집 경험을 위해 Databricks는 SQL 사용자가 스트리밍 tables를 활용할 것을 권장합니다. Databricks SQL에서 스트리밍 을 사용하여 데이터를 로드하는 방법에 대해 을(를) 참조하세요.

Warning

COPY INTO 는 삭제 벡터에 대한 작업 영역 설정을 적용합니다. 사용하도록 설정된 경우, SQL 웨어하우스에서 COPY INTO이 실행되거나 Databricks Runtime 14.0 이상을 사용하는 컴퓨팅에서 실행될 때 대상 table의 삭제 벡터가 활성화됩니다. 삭제 벡터가 활성화되면 삭제 벡터는 Databricks Runtime 11.3 LTS 및 그 이하에서 table에 대한 쿼리를 차단합니다. 삭제 벡터란?자동 사용 삭제 벡터를 참조하세요.

요구 사항

계정 관리자는 수집을 위한 데이터 액세스 구성의 단계를 따라 클라우드 개체 스토리지의 데이터에 대한 액세스를 구성해야 사용자가 데이터를 COPY INTO로드할 수 있습니다.

예시: 스키마 없는 Delta Lake table에 데이터 로드

참고 항목

이 기능은 Databricks Runtime 11.3 LTS 이상에서 사용할 수 있습니다.

빈 자리 표시자 델타 tables을 생성하여 schema을 나중에 COPY INTO 명령에서 추론할 수 있도록, COPY_OPTIONS에서 mergeSchematrue로 설정할 수 있습니다.

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

위의 SQL 문은 idempotent하며 Delta table에 데이터를 정확히 한 번 수집하도록 실행 일정에 추가할 수 있습니다.

참고 항목

COPY INTO외부에서는 빈 Delta table을 사용할 수 없습니다. INSERT INTOMERGE INTO은 스키마 없는 델타 tables로 데이터를 쓸 수 있도록 지원되지 않습니다. 데이터가 COPY INTO로 table에 삽입되면 table는 쿼리할 수 있는 상태가 됩니다.

참조하세요. COPY INTO대한 대상 tables 만들기.

예: Setschema 및 Delta Lake table에 데이터를 로드

다음 예제에서는 Delta table를 만든 다음 COPY INTO SQL 명령을 사용하여 Databricks 데이터 세트에서 샘플 데이터를 table로 로드하는 방법을 보여줍니다. Azure Databricks 클러스터에 연결된 Notebook에서 예제 Python, R, Scala 또는 SQL 코드를 실행할 수 있습니다. Databricks SQLSQL 웨어하우스와 연결된 쿼리에서 SQL 코드를 실행할 수도 있습니다.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

정리하려면 다음 코드를 실행하여 table삭제합니다.

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

메타데이터 파일 정리

VACUUM 실행하여 Databricks Runtime 15.2 이상에서 COPY INTO 만든 참조되지 않은 메타데이터 파일을 정리할 수 있습니다.

참조

추가 리소스

  • Unity Catalogvolumes 또는 외부 위치에서 COPY INTO을 사용하여 데이터 로드

  • 서비스 주체를 사용하여 으로 데이터 로드

  • 동일한 Delta 에 대한 여러 작업의 예제를 비롯한 일반적인 사용 패턴은 를 사용하여 공통 데이터 로드 패턴 를 참고하십시오.