다음을 통해 공유


pandas 사용자 정의 함수

벡터화된 UDF라고도 하는 pandas UDF(사용자 정의 함수)는 Apache Arrow를 사용하여 데이터를 전송하고 pandas를 사용하여 데이터를 사용하는 사용자 정의 함수입니다. pandas UDF는 벡터화된 작업을 허용하여 한 번에 한 행을 실행하는 Python UDF에 비해 성능을 최대 100배까지 높일 수 있습니다.

배경 정보는 블로그 게시물 Apache Spark 3.0의 예정된 릴리스의 New Pandas UDF 및 Python 유형 힌트를 참조하세요.

키워드 pandas_udf를 데코레이터로 사용하여 pandas UDF를 정의하고 Python 형식 힌트로 함수를 래핑합니다. 이 문서에서는 다양한 유형의 pandas UDF에 대해 설명하고 형식 힌트와 함께 pandas UDF를 사용하는 방법을 보여 줍니다.

Series to Series UDF

Series to Series pandas UDF를 사용하여 스칼라 작업을 벡터화합니다. selectwithColumn와 같은 API와 함께 사용할 수 있습니다.

Python 함수는 pandas Series를 입력으로 사용하고 동일한 길이의 pandas Series를 반환해야 하며 Python 형식 힌트에서 이를 지정해야 합니다. Spark는 열을 일괄 처리로 분할하고 각 일괄 처리된 함수를 데이터의 하위 집합으로 호출한 다음 결과를 연결하여 pandas UDF를 실행합니다.

다음 예제에서는 2개 열의 곱을 계산하는 pandas UDF를 만드는 방법을 보여 줍니다.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

Iterator of Series to Iterator of Series UDF

반복기 UDF는 다음을 제외한 모든 면에서 스칼라 pandas UDF와 동일합니다.

  • Python 함수
    • 단일 입력 일괄 처리 대신 일괄 처리의 반복기를 입력으로 사용합니다.
    • 단일 출력 일괄 처리 대신 출력 일괄 처리의 반복기를 반환합니다.
  • 반복기의 전체 출력 길이는 전체 입력의 길이와 같아야 합니다.
  • 래핑된 pandas UDF는 단일 Spark 열을 입력으로 사용합니다.

Python 형식 힌트를 Iterator[pandas.Series] ->Iterator[pandas.Series]로 지정해야 합니다.

이 pandas UDF는 UDF 실행 시 일부 상태를 초기화해야 하는 경우(예: 모든 입력 일괄 처리에 유추를 적용하기 위해 기계 학습 모델 파일을 로드하는 경우) 유용합니다.

다음 예제에서는 반복기 지원을 사용하여 pandas UDF를 만드는 방법을 보여 줍니다.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

Iterator of multiple Series to Iterator of Series UDF

Iterator of multiple Series to Iterator of Series UDF는 Iterator of Series to Iterator of Series UDF와 특성과 제한 사항이 유사합니다. 지정된 함수는 일괄 처리 반복기를 사용하고 일괄 처리 반복기를 출력합니다. UDF 실행 시 일부 상태를 초기화해야 하는 경우에도 유용합니다.

차이점은 다음과 같습니다.

  • 기본 Python 함수는 pandas Series 튜플의 반복기를 사용합니다.
  • 래핑된 pandas UDF는 여러 Spark 열을 입력으로 사용합니다.

형식 힌트를 Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series]로 지정합니다.

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

Series to scalar UDF

Series to scalar pandas UDF는 Spark 집계 함수와 유사합니다. Series to scalar pandas UDF는 하나 이상의 pandas Series에서 스칼라 값으로의 집계를 정의합니다. 여기서 각 pandas Series는 Spark 열을 나타냅니다. API(예: select, withColumn, groupBy.aggpyspark.sql.Window)와 함께 Series to scalar pandas UDF를 사용합니다.

형식 힌트를 pandas.Series, ... ->Any로 나타냅니다. 반환 형식은 기본 데이터 형식이어야 하며 반환된 스칼라는 Python 기본 형식(예: int 또는 float) 또는 NumPy 데이터 형식(예: numpy.int64 또는 numpy.float64)입니다. Any은 가능하면 특정 스칼라 형식이어야 합니다.

이 유형의 UDF는 부분 집계를 지원하지 않으며 각 그룹의 모든 데이터가 메모리에 로드됩니다.

다음 예제에서는 이 유형의 UDF를 사용하여 select, groupBy, window 작업으로 평균을 계산하는 방법을 보여 줍니다.

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

자세한 사용법은 pyspark.sql.functions.pandas_udf를 참조하세요.

사용

화살표 일괄 처리 크기 설정

참고 항목

이 구성은 공유 액세스 모드 및 Databricks Runtime 13.3 LTS에서 14.2까지 구성된 컴퓨팅에 영향을 주지 않습니다.

Spark의 데이터 파티션은 화살표 레코드 일괄 처리로 변환되어 일시적으로 JVM의 메모리 사용량이 높아질 수 있습니다. 메모리 부족 예외를 방지하려면 spark.sql.execution.arrow.maxRecordsPerBatch 구성을 각 일괄 처리의 최대 행 수를 결정하는 정수로 설정하여 화살표 레코드 일괄 처리의 크기를 조정할 수 있습니다. 기본값은 일괄 처리당 10,000개의 레코드입니다. 열 수가 크면 그에 따라 값을 조정해야 합니다. 이 제한을 사용하면 각 데이터 파티션은 1개 이상의 레코드 일괄 처리로 나뉘어 처리됩니다.

표준 시간대 의미 체계가 있는 타임스탬프

Spark는 내부적으로 타임스탬프를 UTC 값으로 저장하고, 표준 시간대를 지정하지 않고 가져온 타임스탬프 데이터는 마이크로초 해상도를 사용하여 현지 시간 기준 UTC로 변환됩니다.

타임스탬프 데이터를 내보내거나 Spark에 표시하면 세션 표준 시간대를 사용하여 타임스탬프 값을 지역화합니다. 세션 표준 시간대는 spark.sql.session.timeZone으로 설정되며 기본적으로 JVM 시스템 로컬 표준 시간대로 설정됩니다. pandas는 나노초 해상도의 datetime64 형식인 datetime64[ns]를 열 단위의 선택적 시간대와 함께 사용합니다.

타임스탬프 데이터가 Spark에서 pandas로 전송되면 나노초로 변환되고 각 열이 Spark 세션 표준 시간대로 변환된 다음 해당 표준 시간대로 지역화되어 표준 시간대를 제거하고 값을 현지 시간으로 표시합니다. 이는 타임스탬프 열로 toPandas() 또는 pandas_udf를 호출할 때 발생합니다.

타임스탬프 데이터가 pandas에서 Spark로 전송되면 UTC 마이크로초로 변환됩니다. 이는 pandas DataFrame으로 createDataFrame을 호출하거나 pandas UDF에서 타임스탬프를 반환할 때 발생합니다. Spark에 예상된 형식의 데이터가 있는지 확인하기 위해 변환이 자동으로 수행되므로 변환을 직접 수행할 필요가 없습니다. 모든 나노초 값이 잘립니다.

표준 UDF는 pandas 타임스탬프와는 다른 Python 날짜/시간 개체로 타임스탬프 데이터를 로드합니다. 최상의 성능을 위해 pandas UDF에서 타임스탬프를 사용할 때 pandas 시계열 기능을 사용하는 것이 좋습니다. 자세한 내용은 시계열/날짜 기능을 참조하세요.

예제 Notebook

다음 Notebook에서는 pandas UDF를 사용하여 달성할 수 있는 성능 향상을 보여 줍니다.

pandas UDF 벤치마크 Notebook

Notebook 가져오기