Funzioni Pandas definite dall'utente
Una funzione definita dall'utente (UDF) pandas, nota anche come funzione definita dall'utente vettorializzata, è una funzione definita dall'utente che usa Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le UDF pandas consentono operazioni vettoriali che possono aumentare le prestazioni fino a 100 volte rispetto alle UDF Python riga per riga.
Per informazioni di base, vedere il post di blog Nuove UDF pandas e hint per i tipi Python nella prossima versione di Apache Spark 3.0.
È possibile definire una funzione pandas definita dall'utente usando la parola chiave pandas_udf
come elemento Decorator ed eseguire il wrapping della funzione con un hint per il tipo Python.
Questo articolo descrive i diversi tipi di UDF pandas e illustra come usare le UDF pandas con hint di tipo.
UDF da serie a serie
Per vettorizzare le operazioni scalari, usare una UDF pandas da serie a serie.
È possibile utilizzarle con API come select
e withColumn
.
La funzione Python deve accettare una serie pandas come input e restituire una serie pandas con la stessa lunghezza ed è necessario specificarli negli hint per il tipo Python. Spark esegue una pandas UDF dividendo columns in batch, chiamando la funzione su ciascun batch come parte dei dati, quindi concatenando i risultati.
Nell'esempio seguente viene illustrato come creare una UDF di pandas che calcola il prodotto di 2 columns.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
UDF da iteratore di serie a iteratore di serie
Una UDF iteratore è identica a una funzione UDF pandas scalare, ad eccezione di:
- La funzione Python
- Accetta un iteratore di batch anziché un singolo batch di input come input.
- Restituisce un iteratore di batch di output anziché un singolo batch di output.
- La lunghezza dell'intero output nell'iteratore deve corrispondere alla lunghezza dell'intero input.
- La Pandas UDF incapsulata accetta un singolo input column di Spark.
È necessario specificare l'hint per il tipo Python come Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Questa UDF pandas è utile quando l'esecuzione UDF richiede l'inizializzazione di uno stato, ad esempio il caricamento di un file del modello di Machine Learning per applicare l'inferenza a ogni batch di input.
L'esempio seguente illustra come creare una UDF pandas con supporto iteratore.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
UDF da iteratore di più serie a iteratore di serie
Una UDF da iteratore di più serie a iteratore di serie presenta caratteristiche e restrizioni simili alla UDF da iteratore di serie a iteratore di serie. La funzione specificata accetta un iteratore di batch e restituisce un iteratore di batch. È utile anche quando l'esecuzione della funzione definita dall'utente richiede l'inizializzazione di uno stato.
Le differenze sono le seguenti:
- La funzione Python sottostante accetta un iteratore di una tupla di serie pandas.
- La funzione definita dall'utente pandas di cui è stato eseguito il wrapping accetta più Spark columns come input.
Specificare gli hint di tipo come Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
UDF da serie a scalare
Le UDF pandas da serie a scalari sono simili alle funzioni di aggregazione di Spark.
Una funzione UDF (User Defined Function) di pandas da Serie a scalare definisce un'aggregazione da una o più Serie pandas a un valore scalare, where ogni Serie pandas rappresenta una sotto-unità di Spark column.
Si usa una serie per scalare la funzione definita dall'utente pandas con API come select
, withColumn
, groupBy.agg
e pyspark.sql.Window.
Si esprime l'hint di tipo come pandas.Series, ...
->Any
. Il tipo restituito deve essere un tipo di dati primitivo e lo scalare restituito può essere un tipo primitivo Python, ad esempio int
o float
o un tipo di dati NumPy, ad esempio numpy.int64
o numpy.float64
.
Any
deve essere idealmente un tipo scalare specifico.
Questo tipo di UDF non supporta l'aggregazione parziale e tutti i dati per ogni gruppo vengono caricati in memoria.
L'esempio seguente mostra come usare questo tipo di UDF per calcolare la media con le operazioni select
, groupBy
e window
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.functions.pandas_udf.
Utilizzo
Impostazione delle dimensioni del batch Arrow
Nota
Questa configurazione non ha alcun impatto sul calcolo configurato con la modalità di accesso condiviso e Databricks Runtime LTS da 13.3 a 14.2.
Le partizioni di dati in Spark vengono convertite in batch di record Arrow, che possono causare temporaneamente un utilizzo elevato della memoria nella JVM. Per evitare possibili eccezioni di memoria insufficiente, è possibile modificare le dimensioni dei batch di record Arrow impostando la configurazione spark.sql.execution.arrow.maxRecordsPerBatch
su un numero intero che determina il numero massimo di righe per ogni batch. Il valore predefinito è 10.000 record per batch. Se il numero di columns è elevato, il valore deve essere regolato di conseguenza. Usando questo limit, ogni dato partition è suddiviso in uno o più gruppi di record per l'elaborazione.
Timestamp con semantica fuso orario
Spark archivia internamente i timestamp come UTC values, e i dati timestamp inseriti senza un fuso orario specificato vengono convertiti all'ora locale in UTC con risoluzione al microsecondo.
Quando i dati timestamp vengono esportati o visualizzati in Spark, il fuso orario della sessione viene usato per localizzare il timestamp values. Il fuso orario della sessione è impostato a set con la configurazione spark.sql.session.timeZone
e viene impostato per impostazione predefinita sul fuso orario locale del sistema JVM. Pandas utilizza un tipo di datetime64
con risoluzione a nanosecondi, datetime64[ns]
, con fuso orario facoltativo su basecolumn.
Quando i dati timestamp vengono trasferiti da Spark a pandas, vengono convertiti in nanosecondi e ogni column viene adattato al fuso orario della sessione Spark. Successivamente, viene localizzato in quel fuso orario, eliminando il fuso orario e visualizzando values come ora locale. Ciò si verifica quando si chiama toPandas()
o pandas_udf
con timestamp columns.
Quando i dati timestamp vengono trasferiti da pandas a Spark, vengono convertiti in microsecondi UTC. Ciò si verifica quando si chiama createDataFrame
con un dataframe pandas o quando si restituisce un timestamp da una UDF pandas. Queste conversioni vengono eseguite automaticamente per garantire che Spark abbia dati nel formato previsto, quindi non è necessario eseguire manualmente alcuna di queste conversioni. Qualsiasi nanosecondo values viene troncato.
Una funzione definita dall'utente standard carica i dati di timestamp come oggetti datetime Python, che è diverso da un timestamp pandas. Per get prestazioni ottimali, è consigliabile usare la funzionalità delle serie temporali pandas quando si usano timestamp in una funzione definita dall'utente pandas. Per informazioni dettagliate, vedere Funzionalità serie temporale/data.
Notebook di esempio
Il notebook seguente illustra i miglioramenti delle prestazioni che è possibile ottenere con le funzioni definite dall'utente pandas: