什麼是使用者定義函式 (UDF)?
使用者定義函式 (UDF) 可讓您重複使用和共用程式碼,以擴充 Azure Databricks 上的內建功能。 使用UDF來執行特定工作,例如複雜的計算、轉換或自定義資料操作。
注意
在具有共用存取模式的叢集上,Databricks Runtime 13.3 LTS 和更新版本支援 Python 純量 UDF,而 Databricks Runtime 14.2 和更新版本支援 Scala UDF。
Python 純量 UDF 可以使用 Databricks Runtime 13.3 LTS 和更新版本中的 SQL 語法,在 Unity 目錄中註冊。 請參閱 Unity 目錄中的使用者定義函式 (UDF)。
何時應該使用UDF?
針對難以使用內建 Apache Spark 函式表達的邏輯使用 UDF。 內建的 Apache Spark 函式已針對分散式處理進行優化,且通常會大規模提供更佳的效能。 如需詳細資訊,請參閱函式。
Databricks 建議 UDF 進行臨機操作查詢、手動數據清理、探勘數據分析,以及小型到中型數據集的作業。 UDF 的常見使用案例包括數據加密和解密、哈希、JSON 剖析和驗證。
針對非常大型數據集和定期或持續執行的任何工作負載,包括 ETL 作業和串流作業,使用 Apache Spark 方法。
已註冊和會話範圍的UDF
使用 SQL 建立的 UDF 會在 Unity 目錄中註冊,並具有相關聯的許可權,而在筆記本中建立的 UDF 是以會話為基礎,且範圍限定為目前的 SparkSession。
您可以使用 Azure Databricks 所支援的任何語言來定義和存取作業階段型 UDF。 UDF 可以是純量或非純量。
注意
目前只有在 Unity 目錄中註冊的 SQL 和 Python 純量 UDF 可在 DBSQL 中使用。
純量 UDF
純量 UDF 會在單一數據列上運作,並傳回每個數據列的單一值。 下列範例會使用純量 UDF 來計算資料行中 name
每個名稱的長度,並在新的 name_length
資料行中新增值:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
若要使用 PySpark 在 Databricks 筆記本中實作這項作業:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
如需詳細資訊,請參閱 Unity 目錄中的使用者定義函式和使用者定義的純量函式 - Python。
使用者定義彙總函式 (UDAF)
使用者定義的聚合函數 (UDAF) 會在多個數據列上運作,並傳回單一匯總結果。 在下列範例中,會定義匯總分數的 UDAF。
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
請參閱適用於 Python 和使用者定義聚合函數的 pandas 用戶定義函式 - Scala。
Python 使用者定義資料表函式 (UDDF)
重要
這項功能處於公開預覽狀態。
注意
Databricks Runtime 14.3 LTS 和更新版本提供 Python UDF。
Python 使用者定義數據表函式 (UDF) 可以針對每個輸入數據列傳回多個數據列和數據行。 在下列範例中,分數數據行中的每個值都會對應至類別清單。 UDTF 的定義是將逗號分隔清單分割成多個數據列。 請參閱 Python 使用者定義資料表函式 (UDF)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
效能考量
- 內建函式 和 SQL UDF 是可用的最有效率的選項。
- Scala UDF 通常較快,因為它們在 Java 虛擬機 (JVM) 內執行,並避免將數據移入和移出 JVM 的額外負荷。
- Python UDF 和 Pandas UDF 通常比 Scala UDF 慢,因為它們需要將數據串行化並移出 JVM 到 Python 解釋器。 Pandas UDF 的速度比 Python UDF 快 100 倍,因為它們使用 Apache Arrow 來降低串行化成本。