Funções de table definidas pelo usuário Python (UDTFs)
Importante
Esse recurso está no
Uma função de table definida pelo usuário (UDTF) permite que você registre funções que retornam tables em vez de valuesescalar. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, cada UDTF é invocado na cláusula FROM
de uma instrução SQL e retorna um table inteiro como saída.
Cada chamada UDTF pode aceitar zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos table que representam tablesde entrada inteira.
Sintaxe UDTF básica
O Apache Spark implementa UDTFs em Python como classes Python, com um método eval
obrigatório que utiliza yield
para emitir linhas de saída.
Para usar sua classe como UDTF, você deve importar a função PySpark udtf
. O Databricks recomenda usar essa função como decorador e especificar explicitamente nomes e tipos de campo usando a opção returnType
(a menos que a classe defina um método analyze
conforme descrito em uma seção posterior).
A UDTF a seguir cria um table utilizando um list constante com dois argumentos inteiros:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrar uma UDTF
As UDTFs são registadas no SparkSession
local e estão isoladas ao nível do notebook ou da tarefa.
Não é possível registrar UDTFs como objetos no Unity Cataloge UDTFs não podem ser usados com armazéns SQL.
Você pode registrar um UDTF no SparkSession
atual para uso em consultas SQL com a função spark.udtf.register()
. Forneça um nome para a função SQL e a classe UDTF do Python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Chamar um UDTF registrado
Uma vez registado, pode utilizar o UDTF em SQL recorrendo ao comando mágico %sql
ou à função spark.sql()
.
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Usar Apache Arrow
Se o seu UDTF recebe uma pequena quantidade de dados como entrada, mas produz uma grande table, o Databricks recomenda o uso da Seta Apache. Você pode habilitá-lo especificando o parâmetro useArrow
ao declarar o UDTF:
@udtf(returnType="c1: int, c2: int", useArrow=True)
Listas de argumentos variáveis - *args e **kwargs
Você pode usar a sintaxe Python *args
ou **kwargs
e implementar a lógica para lidar com um número não especificado de entradas values.
O exemplo a seguir retorna o mesmo resultado enquanto verifica explicitamente o comprimento e os tipos de entrada para os argumentos:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Aqui está o mesmo exemplo, mas usando argumentos de palavra-chave:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definir um schema estático no momento do registo
O UDTF retorna linhas com uma saída schema que consiste numa sequência ordenada de nomes e tipos de column. Se o UDTF schema deve permanecer sempre o mesmo para todas as consultas, pode especificar um schema fixo e estático após o decorador @udtf
. Deve ser um StructType
:
StructType().add("c1", StringType())
Ou uma cadeia de caracteres DDL que representa um tipo struct:
c1: string
Calcular um schema dinâmico no momento da chamada da função
UDTFs também podem calcular a saída schema de forma programática em cada chamada, dependendo do values dos argumentos de entrada. Para fazer isso, defina um método estático chamado analyze
que aceite zero ou mais parameters que correspondam aos argumentos fornecidos para a chamada UDTF específica.
Cada argumento do método analyze
é uma instância da classe AnalyzeArgument
que contém os seguintes campos:
AnalyzeArgument campo de classe |
Descrição |
---|---|
dataType |
O tipo do argumento de entrada como um DataType . Para os argumentos de entrada table, este é um StructType que representa o tabledo columns. |
value |
O valor do argumento de entrada como um Optional[Any] . Isso é None para argumentos table ou argumentos escalares literais que não são constantes. |
isTable |
Se o argumento de entrada é um table como um BooleanType . |
isConstantExpression |
Se o argumento de entrada é uma expressão sujeita a dobragem constante como um BooleanType . |
O método analyze
retorna uma instância da classe AnalyzeResult
, que inclui o resultado table's schema como um StructType
mais alguns campos opcionais. Se o UDTF aceitar um argumento de table de entrada, o AnalyzeResult
também poderá incluir uma maneira solicitada de partition e ordenar as linhas do table de entrada em várias chamadas UDTF, conforme descrito posteriormente.
Campo de classe AnalyzeResult |
Descrição |
---|---|
schema |
A schema do resultado table como um StructType . |
withSinglePartition |
Se todas as linhas de entrada devem ser enviadas para a mesma instância de classe UDTF que um BooleanType . |
partitionBy |
Se set não estiver vazio, todas as linhas com cada combinação única de values das expressões de particionamento serão processadas por uma instância separada da classe UDTF. |
orderBy |
Se set não estiver vazio, isso especifica uma ordenação das linhas dentro de cada partition. |
select |
Se set estiver não-vazio, trata-se de uma sequência de expressões que o UDTF está especificando para o Catalyst avaliar as expressões em relação ao columns a partir do argumento de entrada TABLE. O UDTF recebe um atributo de entrada para cada nome no list na ordem em que são listados. |
Este exemplo analyze
retorna um column de saída para cada palavra no argumento da cadeia de caracteres de entrada.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Encaminhar estado para chamadas eval
futuras
O método analyze
pode servir como um local conveniente para executar a inicialização e, em seguida, encaminhar os resultados para futuras invocações de método eval
para a mesma chamada UDTF.
Para fazer isso, crie uma subclasse de AnalyzeResult
e retorne uma instância da subclasse do método analyze
.
Em seguida, adicione um argumento adicional ao método __init__
para aceitar essa instância.
Este exemplo analyze
retorna uma saída constante schema, mas adiciona informações personalizadas nos metadados de resultado a serem consumidos por futuras chamadas de método __init__
:
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Linhas de saída de rendimento
O método eval
é executado uma vez para cada linha do argumento de table de entrada (ou apenas uma vez se nenhum argumento table for fornecido), seguido por uma invocação do método terminate
no final. Qualquer método produz zero ou mais linhas que estão em conformidade com o resultado schema produzindo tuplas, listas ou objetos pyspark.sql.Row
.
Este exemplo retorna uma linha fornecendo uma tupla de três elementos:
def eval(self, x, y, z):
yield (x, y, z)
Você também pode omitir os parênteses:
def eval(self, x, y, z):
yield x, y, z
Adicione uma vírgula à direita para retornar uma linha contendo apenas uma column:
def eval(self, x, y, z):
yield x,
Você também pode gerar um objeto pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
Este exemplo produz linhas de saída do método terminate
usando o Python list. Você pode armazenar o estado dentro da classe das etapas anteriores na avaliação do UDTF para esta finalidade.
def terminate(self):
yield [self.x, self.y, self.z]
Passar argumentos escalares para uma UDTF
Você pode passar argumentos escalares para um UDTF sob a forma de expressões constantes que incluam literais values ou funções baseadas nessas expressões. Por exemplo:
SELECT * FROM udtf(42, group => upper("finance_department"));
Transmitir table argumentos para uma UDTF
As UDTFs em Python podem aceitar um table como argumento de entrada, além de argumentos escalares de entrada. Um único UDTF também pode aceitar um argumento table e vários argumentos escalares.
Em seguida, qualquer consulta SQL pode aceitar um table como entrada usando a palavra-chave TABLE
seguida de parênteses envolvendo um tableidentifierapropriado, como TABLE(t)
. Como alternativa, você pode passar uma subconsulta table, como TABLE(SELECT a, b, c FROM t)
ou TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
O argumento de entrada table é então representado como um argumento pyspark.sql.Row
do método eval
, com uma chamada para o método eval
para cada linha na entrada table. Você pode usar anotações de campo padrão do PySpark column para interagir com columns em cada linha. O exemplo a seguir demonstra explicitamente como importar o tipo Row
do PySpark e, em seguida, filtrar o table passado no campo id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Para consultar a função, use a palavra-chave TABLE
SQL:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Especificar um particionamento das linhas de entrada provenientes de chamadas de função
Ao chamar um UDTF com um argumento table, qualquer consulta SQL pode partition o table de entrada em várias chamadas de UDTF com base na values de um ou mais tablecolumnsde entrada.
Para especificar um partition, use a cláusula PARTITION BY
na chamada de função após o argumento TABLE
.
Isso garante que todas as linhas de entrada com cada combinação exclusiva de values da columns de particionamento get consumidas por exatamente uma instância da classe UDTF.
Observe que, além de referências column simples, a cláusula PARTITION BY
também aceita expressões arbitrárias baseadas em tablecolumnsde entrada. Por exemplo, você pode especificar a LENGTH
de uma cadeia de caracteres, extrair um mês de uma data ou concatenar duas values.
Também é possível especificar WITH SINGLE PARTITION
em vez de PARTITION BY
para solicitar apenas um partition, em que todas as linhas de entrada devem ser consumidas por exatamente uma instância da classe UDTF.
Dentro de cada partition, você pode, opcionalmente, especificar uma ordenação necessária das linhas de entrada à medida que o método eval
do UDTF as consome. Para tal, forneça uma cláusula ORDER BY
após a cláusula PARTITION BY
ou WITH SINGLE PARTITION
acima descrita.
Por exemplo, considere o seguinte UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
Você pode especificar opções de particionamento ao chamar o UDTF sobre o table de entrada de várias formas:
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Especificar um particionamento das linhas de entrada a partir do método analyze
Note que, para cada uma das maneiras acima de particionar a entrada table ao chamar UDTFs em consultas SQL, existe uma maneira correspondente para que o método analyze
do UDTF especifique automaticamente o mesmo método de particionamento.
- Em vez de chamar um UDTF como
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, pode-se update o métodoanalyze
para set o campopartitionBy=[PartitioningColumn("a")]
e simplesmente chamar a função usandoSELECT * FROM udtf(TABLE(t))
. - Da mesma forma, em vez de especificar
TABLE(t) WITH SINGLE PARTITION ORDER BY b
na consulta SQL, você pode fazeranalyze
set os camposwithSinglePartition=true
eorderBy=[OrderingColumn("b")]
e, em seguida, apenas passarTABLE(t)
. - Em vez de passar
TABLE(SELECT a FROM t)
na consulta SQL, você pode fazeranalyze
setselect=[SelectedColumn("a")]
e, em seguida, apenas passarTABLE(t)
.
No exemplo a seguir, analyze
retorna uma saída constante schema, seleciona um subconjunto de columns do tablede entrada e especifica que a entrada table é particionada em várias chamadas UDTF com base na values do date
column:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])