Funções definidas pelo usuário do Pandas
Uma UDF (função definida pelo usuário) do Pandas, também conhecida como UDF vetorizada, é uma função definida pelo usuário que usa o Apache Arrow para transferir dados e Pandas para trabalhar com os dados. As UDFs do Pandas permitem operações vetorizadas que podem aumentar o desempenho em até 100 vezes em comparação com UDFs do Python de linha por vez.
Para informações gerais, confira a postagem do blog Novas UDFs do Pandas e dicas da função Type do Python na próxima versão do Apache Spark 3.0.
Você define uma UDF de Pandas usando a palavra-chave pandas_udf
como um decorador e encapsula a função com uma dica de tipo Python.
Este artigo descreve os diferentes tipos de UDFs do Pandas e mostra como usar as UDFs dos Pandas com dicas de tipo.
Série para UDF da série
Você usa uma UDF de Series para Series do Pandas para vetorizar operações escalares.
Você pode usá-los com APIs como select
e withColumn
.
A função Python deve ter uma Série Pandas como entrada e retornar uma Série Pandas do mesmo tamanho, e você deve especificá-los nas dicas de tipo do Python. O Spark executa uma UDF pandas dividindo colunas em lotes, chamando a função para cada lote como um subconjunto dos dados e concatenando os resultados.
O exemplo a seguir mostra como criar uma UDF do Pandas que calcula o produto de duas colunas.
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
Iterador de série para iterador da série UDF
Uma UDF de iterador é o mesmo que um UDF do Pandas escalar, exceto:
- A função do Python
- Aceita um iterador de lotes em vez de um único lote de entrada como entrada.
- Retorna um iterador de lotes de saída em vez de um único lote de saída.
- O comprimento de toda a saída no iterador deve ser o mesmo que o comprimento de toda a entrada.
- A UDF do Pandas empacotada aceita uma única coluna Spark como entrada.
Você deve especificar a dica de tipo do Python como Iterator[pandas.Series]
->Iterator[pandas.Series]
.
Essa UDF do Pandas é útil quando a execução da UDF exige a inicialização de algum estado, por exemplo, o carregamento de um arquivo de modelo de machine learning para aplicar a inferência a cada lote de entrada.
O exemplo a seguir mostra como criar uma UDF do Pandas com suporte a iterador.
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
Iterador de várias séries para iterador da série de UDF
Um iterador de várias séries para iterador da série de UDF tem características e restrições semelhantes como Iterador de série para iterador da série de UDF. A função especificada recebe um iterador de lotes e saída de um iterador de lotes. Também é útil quando a execução da UDF requer a inicialização de algum estado.
As diferenças são:
- A função Python subjacente recebe um iterador de uma tupla da Série do Pandas.
- A UDF do Pandas empacotado aceita várias colunas do Spark como entrada.
Especifique as dicas de tipo como Iterator[Tuple[pandas.Series, ...]]
->Iterator[pandas.Series]
.
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
Série para UDF escalar
As séries para UDFs escalares do Pandas são semelhantes às funções de agregação do Spark.
Uma Série para escalar a UDF do Pandas define uma agregação de uma ou mais Séries do Pandas para um valor escalar, em que cada Série do Pandas representa uma coluna Spark.
Você usa uma Série para escalar a UDF do Pandas com APIs como select
, withColumn
, groupBy.agg
e pyspark.sql.Window.
Expresse a dica de tipo como pandas.Series, ...
->Any
. O tipo de retorno deve ser um tipo de dados primitivo e o escalar retornado pode ser um tipo primitivo do Python, por exemplo, int
ou float
um tipo de dados NumPy, como numpy.int64
ou numpy.float64
. Any
O ideal é ser um tipo escalar específico.
Esse tipo de UDF não dá suporte à agregação parcial e todos os dados de cada grupo são carregados na memória.
O exemplo a seguir mostra como usar esse tipo de UDF para calcular a média com as operações select
, groupBy
, e window
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
Para ver o uso detalhado, consulte pyspark.sql.functions.pandas_udf.
Uso
Definindo o tamanho do lote de seta
Observação
Essa configuração não tem impacto na computação configurada com modo de acesso partilhado e Databricks Runtime 13.3 LTS até 14.2.
As partições de dados no Spark são convertidas em lotes de registro de seta, o que pode levar temporariamente ao alto uso de memória na JVM. Para evitar possíveis exceções de falta de memória, você pode ajustar o tamanho dos lotes de registro de seta definindo a configuração spark.sql.execution.arrow.maxRecordsPerBatch
como um inteiro que determina o número máximo de linhas para cada lote. O valor padrão é 10.000 registros por lote. Se o número de colunas for grande, o valor deverá ser ajustado de acordo. Usando esse limite, cada partição de dados é dividida em um ou mais lotes de registro para processamento.
Carimbo de data/hora com semântica de fuso horário
O Spark armazena internamente os carimbos de data/hora como valores UTC e os dados de data/hora trazidos sem um fuso horário especificado são convertidos como hora local para UTC com resolução de microssegundos.
Quando os dados de data/hora são exportados ou exibidos no Spark, o fuso horário da sessão é usado para localizar os valores de carimbo de data/hora. O fuso horário da sessão é definido com a configuração spark.sql.session.timeZone
e assume como padrão o fuso horário local do sistema JVM. O Pandas usa um tipo datetime64
com resolução de nanossegundos, datetime64[ns]
, com fuso horário opcional por coluna.
Quando os dados de data/hora são transferidos do Spark para o Pandas, eles são convertidos em nanossegundos e cada coluna é convertida no fuso horário de sessão do Spark e, em seguida, localizada nesse fuso horário, que remove o fuso horário e exibe valores como hora local. Isso ocorre ao chamar ou toPandas()
com pandas_udf
colunas de carimbo de data/hora.
Quando os dados de data/hora são transferidos do Pandas para o Spark, eles são convertidos em microssegundos UTC. Isso ocorre ao chamar createDataFrame
com um DataFrame do Pandas ou ao retornar um carimbo de data/hora de uma UDF do Pandas. Essas conversões são feitas automaticamente para garantir que o Spark tenha dados no formato esperado, portanto, não é necessário fazer nenhuma dessas conversões por conta própria. Todos os valores de nanossegundos são truncados.
Uma UDF padrão carrega dados de data/hora como objetos datetime do Python, que é diferente de um data/hora pandas. Para obter o melhor desempenho, recomendamos que você use a funcionalidade de séries temporais pandas ao trabalhar com os carimbos de data/hora em uma UDF do Pandas. Para obter detalhes, consulte Funcionalidade de Séries Temporal/Data.
Caderno de exemplo
O notebook a seguir ilustra as melhorias de desempenho que você pode obter com UDFs do Pandas: