PySpark 기본 사항
이 문서에서는 PySpark를 사용하는 방법을 설명하는 간단한 예제를 안내합니다. 기본 Apache Spark 개념을 이해하고 컴퓨팅에 연결된 Azure Databricks Notebook 에서 명령을 실행하고 있다고 가정합니다. 샘플 데이터를 사용하여 DataFrames를 만들고, 이 데이터에 대한 행 및 열 작업을 비롯한 기본 변환을 수행하고, 여러 DataFrame을 결합하고, 이 데이터를 집계하고, 이 데이터를 시각화한 다음, 테이블 또는 파일에 저장합니다.
데이터 업로드
이 문서의 일부 예제에서는 Databricks 제공 샘플 데이터를 사용하여 DataFrames를 사용하여 데이터를 로드, 변환 및 저장하는 방법을 보여 줍니다. Databricks에 아직 없는 사용자 고유의 데이터를 사용하려는 경우 먼저 업로드하고 데이터 프레임을 만들 수 있습니다. 파일 업로드 및 Unity 카탈로그 볼륨에 파일 업로드를 사용하여 테이블 만들기 또는 수정을 참조하세요.
Databricks 샘플 데이터 정보
Databricks는 카탈로그 및 /databricks-datasets
디렉터리에 샘플 데이터를 samples
제공합니다.
- 카탈로그의 샘플 데이터에
samples
액세스하려면 형식samples.<schema-name>.<table-name>
을 사용합니다. 이 문서에서는 가상 비즈니스의samples.tpch
데이터를 포함하는 스키마의 테이블을 사용합니다. 테이블에는customer
고객에 대한 정보가 포함되어 있으며orders
해당 고객이 주문한 주문에 대한 정보가 포함되어 있습니다. - 에서 데이터를
/databricks-datasets
탐색하는 데 사용합니다dbutils.fs.ls
. Spark SQL 또는 DataFrames를 사용하여 파일 경로를 사용하여 이 위치의 데이터를 쿼리합니다. Databricks에서 제공하는 샘플 데이터에 대한 자세한 내용은 샘플 데이터 세트를 참조 하세요.
데이터 형식 가져오기
많은 PySpark 작업을 수행하려면 SQL 함수를 사용하거나 네이티브 Spark 형식과 상호 작용해야 합니다. 필요한 함수 및 형식만 직접 가져오거나 전체 모듈을 가져올 수 있습니다.
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
일부 가져온 함수는 Python 기본 제공 함수를 재정의할 수 있으므로 일부 사용자는 별칭을 사용하여 이러한 모듈을 가져오도록 선택합니다. 다음 예제에서는 Apache Spark 코드 예제에 사용되는 일반적인 별칭을 보여 줍니다.
import pyspark.sql.types as T
import pyspark.sql.functions as F
데이터 형식의 포괄적인 목록은 Spark 데이터 형식을 참조 하세요.
PySpark SQL 함수의 포괄적인 목록은 Spark Functions를 참조 하세요.
DataFrame 만들기
DataFrame을 만드는 방법에는 여러 가지가 있습니다. 일반적으로 테이블 또는 파일 컬렉션과 같은 데이터 원본에 대해 DataFrame을 정의합니다. 그런 다음 Apache Spark 기본 개념 섹션에 설명된 대로 다음과 같은 display
작업을 사용하여 실행할 변환을 트리거합니다. 이 메서드는 display
DataFrames를 출력합니다.
지정된 값을 사용하여 DataFrame 만들기
지정된 값을 사용하여 DataFrame을 만들려면 행이 createDataFrame
튜플 목록으로 표현되는 메서드를 사용합니다.
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
출력에서 열 df_children
의 데이터 형식이 자동으로 유추됩니다. 또는 스키마를 추가하여 형식을 지정할 수 있습니다. 스키마는 이름, 데이터 형식 및 null 값이 포함되어 있는지 여부를 나타내는 부울 플래그를 지정하는 것으로 구성된 StructFields
스키마를 사용하여 StructType
정의됩니다. 에서 데이터 형식을 pyspark.sql.types
가져와야 합니다.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity 카탈로그의 테이블에서 DataFrame 만들기
Unity 카탈로그의 테이블에서 DataFrame을 만들려면 형식<catalog-name>.<schema-name>.<table-name>
을 table
사용하여 테이블을 식별하는 메서드를 사용합니다. 왼쪽 탐색 모음에서 카탈로그를 클릭하여 카탈로그 탐색기를 사용하여 테이블로 이동합니다. 테이블 경로를 클릭한 다음 표 경로 복사를 선택하여 Notebook에 테이블 경로를 삽입합니다.
다음 예제에서는 테이블을 samples.tpch.customer
로드하지만 사용자 고유의 테이블에 대한 경로를 제공할 수도 있습니다.
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
업로드된 파일에서 DataFrame 만들기
Unity 카탈로그 볼륨에 업로드한 파일에서 DataFrame을 만들려면 이 속성을 사용합니다 read
. 이 메서드는 적절한 형식을 DataFrameReader
읽는 데 사용할 수 있는 값을 반환합니다. 왼쪽의 작은 사이드바에서 카탈로그 옵션을 클릭하고 카탈로그 브라우저를 사용하여 파일을 찾습니다. 이를 선택한 다음 볼륨 파일 경로 복사를 클릭합니다.
아래 예제에서는 파일에서 *.csv
읽지만 DataFrameReader
다른 많은 형식의 파일 업로드를 지원합니다. DataFrameReader 메서드를 참조하세요.
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity 카탈로그 볼륨에 대한 자세한 내용은 Unity 카탈로그 볼륨이란?을 참조하세요.
JSON 응답에서 DataFrame 만들기
REST API에서 반환된 JSON 응답 페이로드에서 DataFrame을 만들려면 Python requests
패키지를 사용하여 응답을 쿼리하고 구문 분석합니다. 패키지를 사용하려면 패키지를 가져와야 합니다. 이 예제에서는 미국 식품의약품안전청의 약물 응용 프로그램 데이터베이스의 데이터를 사용합니다.
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Databricks에서 JSON 및 기타 반구조화된 데이터를 사용하는 방법에 대한 자세한 내용은 반구조화된 데이터 모델을 참조하세요.
JSON 필드 또는 개체 선택
변환된 JSON에서 특정 필드 또는 개체를 선택하려면 표기 []
법을 사용합니다. 예를 들어 제품 배열인 필드를 선택 products
하려면 다음을 수행합니다.
display(df_drugs.select(df_drugs["products"]))
메서드 호출을 함께 연결하여 여러 필드를 트래버스할 수도 있습니다. 예를 들어 약물 응용 프로그램에서 첫 번째 제품의 브랜드 이름을 출력하려면 다음을 수행합니다.
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
파일에서 DataFrame 만들기
파일에서 DataFrame을 만드는 방법을 보여 주는 이 예제에서는 디렉터리에 CSV 데이터를 /databricks-datasets
로드합니다.
샘플 데이터 세트로 이동하려면 Databricks Utilties 파일 시스템 명령을 사용할 수 있습니다. 다음 예제에서는 다음에서 사용할 수 있는 데이터 세트를 나열하는 데 사용합니다dbutils
./databricks-datasets
display(dbutils.fs.ls('/databricks-datasets'))
또는 다음 예제와 같이 Databricks CLI 파일 시스템 명령에 액세스하는 데 사용할 %fs
수 있습니다.
%fs ls '/databricks-datasets'
파일 또는 파일 디렉터리에서 DataFrame을 만들려면 메서드의 경로를 load
지정합니다.
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
DataFrames를 사용하여 데이터 변환
데이터 프레임을 사용하면 기본 제공 메서드를 사용하여 데이터를 쉽게 변환하여 데이터를 정렬, 필터링 및 집계할 수 있습니다. 대부분의 변환은 DataFrames에서 메서드로 지정되지 않고 패키지에 spark.sql.functions
제공됩니다. Databricks Spark SQL Functions를 참조 하세요.
열 작업
Spark는 다음과 같은 많은 기본 열 작업을 제공합니다.
팁
DataFrame의 모든 열을 출력하려면 예를 들어 df_customer.columns
사용합니다columns
.
열 선택
를 사용하여 select
col
특정 열을 선택할 수 있습니다. 함수는 col
하위 코드에 pyspark.sql.functions
있습니다.
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
문자열로 정의된 식을 사용하는 expr
열을 참조할 수도 있습니다.
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
SQL 식을 허용하는 다음을 사용할 selectExpr
수도 있습니다.
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
문자열 리터럴을 사용하여 열을 선택하려면 다음을 수행합니다.
df_customer.select(
"c_custkey",
"c_acctbal"
)
특정 DataFrame에서 열을 명시적으로 선택하려면 연산자 또는 연산자를 []
.
사용할 수 있습니다. (연산자는 .
정수로 시작하는 열이나 공백이나 특수 문자가 포함된 열을 선택하는 데 사용할 수 없습니다.) 이는 일부 열의 이름이 같은 DataFrame을 조인할 때 특히 유용할 수 있습니다.
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
열 만들기
새 열을 만들려면 메서드를 withColumn
사용합니다. 다음 예제에서는 고객 계정 잔액 c_acctbal
이 초과되는지 여부에 따라 부울 값이 포함된 새 열을 만듭니다.1000
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
열 이름 바꾸기
열 이름을 바꾸려면 기존 및 새 열 이름을 허용하는 메서드를 사용합니다 withColumnRenamed
.
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
이 alias
메서드는 집계의 일부로 열의 이름을 바꾸려는 경우에 특히 유용합니다.
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
캐스트 열 형식
경우에 따라 DataFrame에서 하나 이상의 열에 대한 데이터 형식을 변경할 수 있습니다. 이렇게 하려면 메서드를 cast
사용하여 열 데이터 형식 간에 변환합니다. 다음 예제에서는 열을 참조하는 메서드를 사용하여 col
열을 정수에서 문자열 형식으로 변환하는 방법을 보여줍니다.
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
열 제거
열을 제거하려면 선택 중에 열을 생략하거나 select(*) except
메서드를 drop
사용할 수 있습니다.
df_customer_flag_renamed.drop("balance_flag_renamed")
여러 열을 한 번에 삭제할 수도 있습니다.
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
행 작업
Spark는 다음과 같은 다양한 기본 행 작업을 제공합니다.
행 필터링
행을 필터링하려면 DataFrame에서 filter
또는 where
메서드를 사용하여 특정 행만 반환합니다. 필터링할 열을 식별하려면 열로 col
계산되는 메서드 또는 식을 사용합니다.
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
여러 조건을 필터링하려면 논리 연산자를 사용합니다. 예를 들어 각각 &
사용자와 |
OR
조건을 사용하도록 설정합니다AND
. 다음 예제에서는 행이 c_nationkey
같고 c_acctbal
보다 1000
큰 행을 20
필터링합니다.
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
중복 행 제거
행을 중복 제거하려면 고유한 행만 반환하는
df_unique = df_customer.distinct()
null 값 처리
null 값을 처리하려면 메서드를 사용하여 null 값이 포함된 행을 na.drop
삭제합니다. 이 메서드를 사용하면 null 값 또는 all
null 값이 포함된 any
행을 삭제할지 여부를 지정할 수 있습니다.
null 값을 삭제하려면 다음 예제 중 하나를 사용합니다.
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
대신 모든 null 값이 포함된 행만 필터링하려면 다음을 사용합니다.
df_customer_no_nulls = df_customer.na.drop("all")
아래와 같이 이를 지정하여 열의 하위 집합에 적용할 수 있습니다.
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
누락된 값을 채우려면 메서드를 fill
사용합니다. 모든 열 또는 열의 하위 집합에 적용하도록 선택할 수 있습니다. 아래 예제에서는 계정 잔액에 대해 null 값이 있는 계정 잔액 c_acctbal
이 채워 0
집니다.
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
문자열을 다른 값으로 바꾸려면 메서드를 replace
사용합니다. 아래 예제에서는 빈 주소 문자열이 다음 단어 UNKNOWN
로 바뀝니다.
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
행 추가
행을 추가하려면 메서드를 union
사용하여 새 DataFrame을 만들어야 합니다. 다음 예제에서는 이전에 만든 DataFrame df_that_one_customer
이 df_filtered_customer
결합되어 세 명의 고객이 있는 DataFrame을 반환합니다.
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
참고 항목
테이블에 작성한 다음 새 행을 추가하여 DataFrame을 결합할 수도 있습니다. 프로덕션 워크로드의 경우 대상 테이블에 대한 데이터 원본을 증분 처리하면 데이터의 크기가 커짐에 따라 대기 시간과 컴퓨팅 비용을 크게 줄일 수 있습니다. Databricks 레이크하우스로 데이터 수집을 참조 하세요.
행 정렬
Important
정렬은 대규모로 비용이 많이 들 수 있으며 정렬된 데이터를 저장하고 Spark를 사용하여 데이터를 다시 로드하는 경우 순서가 보장되지 않습니다. 정렬을 사용할 때 의도적인지 확인합니다.
하나 이상의 열을 기준으로 행을 정렬하려면 또는 orderBy
메서드를 sort
사용합니다. 기본적으로 이러한 메서드는 오름차순으로 정렬됩니다.
df_customer.orderBy(col("c_acctbal"))
내림차순으로 필터링하려면 다음을 사용합니다 desc
.
df_customer.sort(col("c_custkey").desc())
다음 예제에서는 두 열을 정렬하는 방법을 보여 줍니다.
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
DataFrame이 정렬되면 반환할 행 수를 제한하려면 이 메서드를 limit
사용합니다. 다음 예제에서는 상위 10
결과만 표시합니다.
display(df_sorted.limit(10))
데이터 프레임 조인
둘 이상의 DataFrame을 조인하려면 이 메서드를 join
사용합니다. 데이터 프레임을 조인 형식(조인 형식) 및 on
(조인을 기반으로 하는 열) 매개 변수에 조 how
인하는 방법을 지정할 수 있습니다. 일반적인 조인 유형은 다음과 같습니다.
inner
: 조인 유형 기본값으로, DataFrame에서 매개 변수에 대한on
일치 항목이 있는 행만 유지하는 DataFrame을 반환합니다.left
: 첫 번째 지정된 DataFrame의 모든 행과 첫 번째 데이터 프레임과 일치하는 두 번째 지정된 DataFrame의 행만 유지합니다.outer
: 외부 조인은 일치 항목에 관계없이 두 DataFrame의 모든 행을 유지합니다.
조인에 대한 자세한 내용은 Azure Databricks에서 조인 작업을 참조 하세요. PySpark에서 지원되는 조인 목록은 DataFrame 조인을 참조 하세요.
다음 예제에서는 DataFrame의 각 행이 DataFrame의 orders
해당 행과 조인되는 단일 DataFrame을 customers
반환합니다. 내부 조인은 모든 주문이 정확히 하나의 고객에 해당하기 때문에 사용됩니다.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
여러 조건에 조인하려면 각각 부울 연산자(예: &
|
및 지정 AND
및 OR
)를 사용합니다. 다음 예제에서는 다음보다 500,000
큰 행에만 필터링하는 추가 조건을 추가합니다o_totalprice
.
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
데이터 집계
SQL에서와 유사하게 GROUP BY
DataFrame에서 데이터를 집계하려면 그룹 agg
화할 열을 지정하는 방법과 집계를 지정하는 메서드를 사용합니다groupBy
. 를 pyspark.sql.functions
min
max
sum
비롯한 avg
일반적인 집계를 가져옵니다. 다음 예제에서는 시장 부문별 평균 고객 잔액을 보여 줍니다.
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
일부 집계는 계산을 트리거하는 작업입니다. 이 경우 다른 작업을 사용하여 결과를 출력할 필요가 없습니다.
DataFrame의 행 수를 계산하려면 다음 메서드를 count
사용합니다.
df_customer.count()
연결 호출
DataFrames를 변환하는 메서드는 DataFrames를 반환하고, Spark는 작업이 호출될 때까지 변환에 대해 동작하지 않습니다. 이 지연 평가 는 편의성과 가독성을 위해 여러 메서드를 연결할 수 있습니다. 다음 예제에서는 필터링, 집계 및 순서를 연결하는 방법을 보여줍니다.
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
DataFrame 시각화
Notebook에서 DataFrame을 시각화하려면 DataFrame의 왼쪽 위에 있는 표 옆에 있는 기호를 클릭한 + 다음 시각화를 선택하여 DataFrame에 따라 하나 이상의 차트를 추가합니다. 시각화에 대한 자세한 내용은 Databricks Notebook의 시각화를 참조 하세요.
display(df_order)
추가 시각화를 수행하기 위해 Databricks는 Spark용 pandas API를 사용하는 것이 좋습니다. Spark .pandas_api()
DataFrame에 대한 해당 pandas API로 캐스팅할 수 있습니다. 자세한 내용은 Spark의 Pandas API를 참조하세요.
데이터 저장
데이터를 변환한 후에는 메서드를 사용하여 DataFrameWriter
데이터를 저장할 수 있습니다. 이러한 메서드의 전체 목록은 DataFrameWriter에서 찾을 수 있습니다. 다음 섹션에서는 DataFrame을 테이블 및 데이터 파일 컬렉션으로 저장하는 방법을 보여 줍니다.
DataFrame을 테이블로 저장
DataFrame을 Unity 카탈로그에 테이블로 저장하려면 메서드를 write.saveAsTable
사용하고 형식 <catalog-name>.<schema-name>.<table-name>
으로 경로를 지정합니다.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
DataFrame을 CSV로 작성
서식을 지정할 DataFrame을 *.csv
작성하려면 형식 및 옵션을 지정하는 메서드를 사용합니다 write.csv
. 기본적으로 지정된 경로에 데이터가 있으면 쓰기 작업이 실패합니다. 다음 모드 중 하나를 지정하여 다른 작업을 수행할 수 있습니다.
overwrite
대상 경로의 모든 기존 데이터를 DataFrame 콘텐츠로 덮어씁니다.append
는 데이터 프레임의 콘텐츠를 대상 경로의 데이터에 추가합니다.ignore
대상 경로에 데이터가 있으면 자동으로 쓰기가 실패합니다.
다음 예제에서는 DataFrame 콘텐츠를 CSV 파일로 덮어쓰는 방법을 보여 줍니다.
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
다음 단계
Databricks에서 더 많은 Spark 기능을 활용하려면 다음을 참조하세요.