사용자 정의 스칼라 함수 - Python
이 문서에는 Python UDF(사용자 정의 함수) 예제가 포함되어 있습니다. UDF를 등록하는 방법, UDF를 호출하는 방법을 보여 주고 Spark SQL에서 하위 식의 평가 순서에 대한 주의 사항을 제공합니다.
Databricks Runtime 14.0 이상에서는 Python UDF(사용자 정의 table 함수)를 사용하여 스칼라 values대신 전체 관계를 반환하는 함수를 등록할 수 있습니다. Python 사용자 정의 함수(UDTF) table참조하세요.
참고 항목
Databricks Runtime 12.2 LTS 이하에서는 공유 액세스 모드를 사용하는 Unity Catalog 컴퓨팅에서 Python UDF 및 Pandas UDF가 지원되지 않습니다. 스칼라 Python UDF 및 Pandas UDF는 모든 액세스 모드에 대해 Databricks Runtime 13.3 LTS 이상에서 지원됩니다.
Databricks Runtime 13.3 LTS 이상에서는 SQL 구문을 사용하여 Unity Catalog에 스칼라 Python UDF를 등록할 수 있습니다. 사용자 정의 함수(UDF) 을 Unity Catalog에서 참조하세요.
UDF로 함수 등록
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
필요에 따라 UDF의 반환 형식을 set 수 있습니다. 기본 반환 형식은 StringType
입니다.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL에서 UDF 호출
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrames에서 UDF 사용
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
또는 주석 구문을 사용하여 동일한 UDF를 선언할 수 있습니다.
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
평가 순서 및 Null 검사
Spark SQL(SQL과 DataFrame 및 데이터 세트 API 포함)은 하위 식의 평가 순서를 보장하지 않습니다. 특히 연산자 또는 함수 입력이 반드시 왼쪽에서 오른쪽 또는 다른 고정 순서로 평가되는 것은 아닙니다. 예를 들어 논리적 AND
및 OR
식에는 왼쪽에서 오른쪽 “단락” 의미 체계가 없습니다.
따라서 쿼리 최적화 및 계획 중에 식과 절의 순서가 변경될 수 있으므로 부울 식의 부작용 또는 평가 순서와 WHERE
및 HAVING
절의 순서에 의존하는 것은 위험합니다. 특히 UDF가 SQL의 단락 의미 체계를 null 검사에 사용하는 경우 UDF를 호출하기 전에 Null 검사가 수행된다는 보장이 없습니다. 예를 들면 다음과 같습니다.
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
여기서 WHERE
절은 Null을 필터링한 후 strlen
UDF가 호출되도록 보장하지 않습니다.
적절한 Null 검사를 수행하려면 다음 중 하나를 수행하는 것이 좋습니다.
- UDF 자체가 Null을 인식하도록 설정하고 UDF 내에서 Null 검사를 수행합니다.
-
IF
또는CASE WHEN
식을 사용하여 Null 검사를 수행하고 조건부 분기에서 UDF를 호출합니다.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
제한 사항
- 공유 클러스터 또는 서버리스 컴퓨팅에서 PySpark UDF는 Databricks Runtime 14.2 및 그 이하 버전에서 모듈을 가져오기 위해 Git 폴더, 작업 영역 파일 또는 UC Volumes에 접근할 수 없습니다.