Condividi tramite


Funzioni Pandas definite dall'utente

Una funzione definita dall'utente (UDF) pandas, nota anche come funzione definita dall'utente vettorializzata, è una funzione definita dall'utente che usa Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le UDF pandas consentono operazioni vettoriali che possono aumentare le prestazioni fino a 100 volte rispetto alle UDF Python riga per riga.

Per informazioni di base, vedere il post di blog Nuove UDF pandas e hint per i tipi Python nella prossima versione di Apache Spark 3.0.

È possibile definire una funzione pandas definita dall'utente usando la parola chiave pandas_udf come elemento Decorator ed eseguire il wrapping della funzione con un hint per il tipo Python. Questo articolo descrive i diversi tipi di UDF pandas e illustra come usare le UDF pandas con hint di tipo.

UDF da serie a serie

Per vettorizzare le operazioni scalari, usare una UDF pandas da serie a serie. È possibile utilizzarle con API come select e withColumn.

La funzione Python deve accettare una serie pandas come input e restituire una serie pandas con la stessa lunghezza ed è necessario specificarli negli hint per il tipo Python. Spark esegue una pandas UDF dividendo columns in batch, chiamando la funzione su ciascun batch come parte dei dati, quindi concatenando i risultati.

Nell'esempio seguente viene illustrato come creare una UDF di pandas che calcola il prodotto di 2 columns.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF da iteratore di serie a iteratore di serie

Una UDF iteratore è identica a una funzione UDF pandas scalare, ad eccezione di:

  • La funzione Python
    • Accetta un iteratore di batch anziché un singolo batch di input come input.
    • Restituisce un iteratore di batch di output anziché un singolo batch di output.
  • La lunghezza dell'intero output nell'iteratore deve corrispondere alla lunghezza dell'intero input.
  • La Pandas UDF incapsulata accetta un singolo input column di Spark.

È necessario specificare l'hint per il tipo Python come Iterator[pandas.Series] ->Iterator[pandas.Series].

Questa UDF pandas è utile quando l'esecuzione UDF richiede l'inizializzazione di uno stato, ad esempio il caricamento di un file del modello di Machine Learning per applicare l'inferenza a ogni batch di input.

L'esempio seguente illustra come creare una UDF pandas con supporto iteratore.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF da iteratore di più serie a iteratore di serie

Una UDF da iteratore di più serie a iteratore di serie presenta caratteristiche e restrizioni simili alla UDF da iteratore di serie a iteratore di serie. La funzione specificata accetta un iteratore di batch e restituisce un iteratore di batch. È utile anche quando l'esecuzione della funzione definita dall'utente richiede l'inizializzazione di uno stato.

Le differenze sono le seguenti:

  • La funzione Python sottostante accetta un iteratore di una tupla di serie pandas.
  • La funzione definita dall'utente pandas di cui è stato eseguito il wrapping accetta più Spark columns come input.

Specificare gli hint di tipo come Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF da serie a scalare

Le UDF pandas da serie a scalari sono simili alle funzioni di aggregazione di Spark. Una funzione UDF (User Defined Function) di pandas da Serie a scalare definisce un'aggregazione da una o più Serie pandas a un valore scalare, where ogni Serie pandas rappresenta una sotto-unità di Spark column. Si usa una serie per scalare la funzione definita dall'utente pandas con API come select, withColumn, groupBy.agge pyspark.sql.Window.

Si esprime l'hint di tipo come pandas.Series, ... ->Any. Il tipo restituito deve essere un tipo di dati primitivo e lo scalare restituito può essere un tipo primitivo Python, ad esempio int o float o un tipo di dati NumPy, ad esempio numpy.int64 o numpy.float64. Any deve essere idealmente un tipo scalare specifico.

Questo tipo di UDF non supporta l'aggregazione parziale e tutti i dati per ogni gruppo vengono caricati in memoria.

L'esempio seguente mostra come usare questo tipo di UDF per calcolare la media con le operazioni select, groupBy e window:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Per informazioni dettagliate sull'utilizzo, vedere pyspark.sql.functions.pandas_udf.

Utilizzo

Impostazione delle dimensioni del batch Arrow

Nota

Questa configurazione non ha alcun impatto sul calcolo configurato con la modalità di accesso condiviso e Databricks Runtime LTS da 13.3 a 14.2.

Le partizioni di dati in Spark vengono convertite in batch di record Arrow, che possono causare temporaneamente un utilizzo elevato della memoria nella JVM. Per evitare possibili eccezioni di memoria insufficiente, è possibile modificare le dimensioni dei batch di record Arrow impostando la configurazione spark.sql.execution.arrow.maxRecordsPerBatch su un numero intero che determina il numero massimo di righe per ogni batch. Il valore predefinito è 10.000 record per batch. Se il numero di columns è elevato, il valore deve essere regolato di conseguenza. Usando questo limit, ogni dato partition è suddiviso in uno o più gruppi di record per l'elaborazione.

Timestamp con semantica fuso orario

Spark archivia internamente i timestamp come UTC values, e i dati timestamp inseriti senza un fuso orario specificato vengono convertiti all'ora locale in UTC con risoluzione al microsecondo.

Quando i dati timestamp vengono esportati o visualizzati in Spark, il fuso orario della sessione viene usato per localizzare il timestamp values. Il fuso orario della sessione è impostato a set con la configurazione spark.sql.session.timeZone e viene impostato per impostazione predefinita sul fuso orario locale del sistema JVM. Pandas utilizza un tipo di datetime64 con risoluzione a nanosecondi, datetime64[ns], con fuso orario facoltativo su basecolumn.

Quando i dati timestamp vengono trasferiti da Spark a pandas, vengono convertiti in nanosecondi e ogni column viene adattato al fuso orario della sessione Spark. Successivamente, viene localizzato in quel fuso orario, eliminando il fuso orario e visualizzando values come ora locale. Ciò si verifica quando si chiama toPandas() o pandas_udf con timestamp columns.

Quando i dati timestamp vengono trasferiti da pandas a Spark, vengono convertiti in microsecondi UTC. Ciò si verifica quando si chiama createDataFrame con un dataframe pandas o quando si restituisce un timestamp da una UDF pandas. Queste conversioni vengono eseguite automaticamente per garantire che Spark abbia dati nel formato previsto, quindi non è necessario eseguire manualmente alcuna di queste conversioni. Qualsiasi nanosecondo values viene troncato.

Una funzione definita dall'utente standard carica i dati di timestamp come oggetti datetime Python, che è diverso da un timestamp pandas. Per get prestazioni ottimali, è consigliabile usare la funzionalità delle serie temporali pandas quando si usano timestamp in una funzione definita dall'utente pandas. Per informazioni dettagliate, vedere Funzionalità serie temporale/data.

Notebook di esempio

Il notebook seguente illustra i miglioramenti delle prestazioni che è possibile ottenere con le funzioni definite dall'utente pandas:

Notebook benchmark delle UDF pandas

Get portatile