Funzioni definite dall'utente Python table (UDTF)
Importante
Questa funzionalità si trova in anteprima pubblica in Databricks Runtime 14.3 LTS e versioni successive.
Una funzione table definita dall'utente (UDTF) consente di registrare funzioni che restituiscono tables anziché scalari values. A differenza delle funzioni scalari che restituiscono un singolo valore di risultato da ogni chiamata, ogni funzione tabella definita dall'utente (UDTF) viene richiamata nella clausola FROM
di un'istruzione SQL e restituisce un'intera table come output.
Ogni chiamata UDTF può accettare zero o più argomenti. Questi argomenti possono essere espressioni scalari o argomenti table che rappresentano l'intero input tables.
Sintassi UDTF di base
Apache Spark implementa le funzioni definite dall'utente Python come classi Python con un metodo di eval
obbligatorio che usa yield
per generare righe di output.
Per usare la tua classe come UDTF, è necessario importare la funzione PySpark udtf
. Databricks consiglia di usare questa funzione come decorator e specificare in modo esplicito i nomi e i tipi di campo usando l'opzione returnType
(a meno che la classe non definisca un metodo analyze
come descritto in una sezione successiva).
La seguente funzione definita dall'utente crea un table utilizzando un list fisso di due argomenti interi:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
Registrare una UDTF
Le UDTF (User Defined Table Function) vengono registrate nel SparkSession
locale e sono isolate a livello di notebook o processo.
Non è possibile registrare le UDTF come oggetti in Unity Catalog, e le UDTF non possono essere utilizzate con i magazzini SQL.
È possibile registrare un UDTF per l'SparkSession
corrente da usare nelle query SQL con la funzione spark.udtf.register()
. Specificare un nome per la funzione SQL e la classe UDTF python.
spark.udtf.register("get_sum_diff", GetSumDiff)
Chiamare un UDTF che è registrato
Dopo la registrazione, è possibile usare UDTF in SQL usando il comando magic %sql
o la funzione spark.sql()
:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Usare Apache Arrow
Se la tua funzione definita dall'utente per le tabelle (UDTF) riceve una piccola quantità di dati come input ma produce un output tabledi grandi dimensioni, Databricks consiglia di utilizzare Apache Arrow. È possibile abilitarla specificando il parametro useArrow
quando si dichiara la UDTF.
@udtf(returnType="c1: int, c2: int", useArrow=True)
Elenchi di argomenti variabili - *args e **kwargs
È possibile usare la sintassi di Python *args
o **kwargs
e implementare la logica per gestire un numero non specificato di input values.
Nell'esempio seguente viene restituito lo stesso risultato controllando in modo esplicito la lunghezza e i tipi di input per gli argomenti:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Di seguito è riportato lo stesso esempio, ma si usano argomenti di parole chiave:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
Definire un schema statico in fase di registrazione
La funzione tabella definita dall'utente (UDTF) restituisce righe con un output schema che comprende una sequenza ordinata di nomi e tipi di column. Se il UDTF schema deve rimanere sempre lo stesso per tutte le query, è possibile specificare un schema fisso e statico dopo il decoratore @udtf
. Deve essere un StructType
:
StructType().add("c1", StringType())
Oppure una stringa DDL che rappresenta un tipo di struct:
c1: string
Calcolare un schema dinamico al momento della chiamata della funzione
Le funzioni di tabella definite dall'utente (UDTF) possono anche calcolare in modo programmato l'output schema per ciascuna chiamata, a seconda del values degli argomenti di input. A tale scopo, definire un metodo statico denominato analyze
che accetta zero o più parameters che corrispondono agli argomenti forniti alla chiamata UDTF specifica.
Ogni argomento del metodo analyze
è un'istanza della classe AnalyzeArgument
che contiene i campi seguenti:
campo classe AnalyzeArgument |
Descrizione |
---|---|
dataType |
Tipo del parametro di ingresso come DataType . Per gli argomenti di input table, si tratta di un StructType che rappresenta il columnsdell'table. |
value |
Valore dell'argomento di input come Optional[Any] . Si tratta di None per argomenti table o argomenti scalari letterali che non sono costanti. |
isTable |
Indica se l'argomento di input è un table come BooleanType . |
isConstantExpression |
Indica se l'argomento di input è un'espressione costante pieghevole come BooleanType . |
Il metodo analyze
restituisce un'istanza della classe AnalyzeResult
, che include il schema del risultato tablecome StructType
, più alcuni campi facoltativi. Se la UDTF accetta un argomento di input table, il AnalyzeResult
può anche includere un modo richiesto per partition e ordinare le righe del table di input attraverso diverse chiamate UDTF, come descritto più avanti.
campo classe AnalyzeResult |
Descrizione |
---|---|
schema |
Il schema del risultato table è considerato un StructType . |
withSinglePartition |
Indica se inviare tutte le righe di input alla stessa istanza della classe UDTF come un BooleanType . |
partitionBy |
Se set è non vuoto, tutte le righe con ogni combinazione univoca di espressioni di partizionamento values vengono utilizzate da un'istanza separata della classe UDTF. |
orderBy |
Se set non è vuoto, specifica un ordinamento delle righe all'interno di ogni partition. |
select |
Se set è non vuoto, si tratta di una sequenza di espressioni che la funzione di tabella definita dall'utente specifica affinché Catalyst le valuti contro il columns nell'argomento TABLE di input. La UDTF riceve un attributo di input per ogni nome nel list nell'ordine in cui sono elencati. |
Questo esempio analyze
restituisce un output column per ciascuna parola nella stringa di input.
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
Trasmetti lo stato alle chiamate future eval
Il metodo analyze
può fungere da luogo pratico per eseguire l'inizializzazione e quindi inoltrare i risultati alle future chiamate al metodo eval
per la stessa chiamata UDTF.
A tale scopo, creare una sottoclasse di AnalyzeResult
e restituire un'istanza della sottoclasse dal metodo analyze
.
Aggiungere quindi un argomento aggiuntivo al metodo __init__
per accettare tale istanza.
Questo esempio analyze
restituisce un output costante schema, ma aggiunge informazioni personalizzate nei metadati del risultato per essere utilizzato da future chiamate del metodo __init__
.
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
Generare righe di output
Il metodo eval
viene eseguito una volta per ogni riga dell'argomento table di input (o una sola volta se non viene fornito alcun argomento table), seguito da una chiamata del metodo terminate
alla fine. Il metodo restituisce zero o più righe conformi al risultato schema producendo tuple, elenchi o oggetti pyspark.sql.Row
.
In questo esempio, una tupla di tre elementi restituirà una riga.
def eval(self, x, y, z):
yield (x, y, z)
È anche possibile omettere le parentesi:
def eval(self, x, y, z):
yield x, y, z
Aggiungere una virgola finale per restituire una riga con un solo column:
def eval(self, x, y, z):
yield x,
È anche possibile produrre un oggetto pyspark.sql.Row
.
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
In questo esempio, le righe di output sono restituite dal metodo terminate
utilizzando un listdi Python. È possibile archiviare lo stato all'interno della classe dai passaggi precedenti nella valutazione UDTF per questo scopo.
def terminate(self):
yield [self.x, self.y, self.z]
Passare argomenti scalari a una UDTF (funzione definita dall'utente)
È possibile passare argomenti scalari a un tipo definito dall'utente come espressioni costanti che comprendono values letterali o funzioni basate su di esse. Per esempio:
SELECT * FROM udtf(42, group => upper("finance_department"));
Passa argomenti table a una funzione definita dall'utente (UDTF)
Le UDTF (funzioni definite dall'utente di tipo tabella) Python possono accettare un input table come argomento oltre agli argomenti di input scalari. Un singolo UDTF può anche accettare un argomento table e più argomenti scalari.
Qualsiasi query SQL può quindi fornire un table di input usando la parola chiave TABLE
seguita da parentesi che circondano un tableidentifierappropriato, come ad esempio TABLE(t)
. In alternativa, è possibile passare una sottoquery table, ad esempio TABLE(SELECT a, b, c FROM t)
o TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
.
L'argomento table di input viene quindi rappresentato come argomento pyspark.sql.Row
al metodo eval
, con una chiamata al metodo eval
per ogni riga dell'input table. È possibile usare le annotazioni standard dei campi PySpark column per interagire con columns in ogni riga. Nell'esempio seguente viene illustrata l'importazione esplicita del tipo di Row
PySpark e il filtro dei table passati nel campo id
:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
Per eseguire una query sulla funzione, usare la parola chiave SQL TABLE
:
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Specificare un partizionamento delle righe di input dalle chiamate di funzione.
Quando si chiama una UDTF con un argomento table, qualsiasi query SQL può partition l'input table in diverse chiamate UDTF in base al values di uno o più tablecolumnsdi input.
Per specificare un partition, utilizzare la clausola PARTITION BY
nella chiamata di funzione dopo l'argomento TABLE
.
Ciò garantisce che tutte le righe di input con ogni combinazione univoca di values nel partizionamento columns verranno get consumate da una sola istanza della classe UDTF.
Si noti che oltre ai riferimenti column semplici, la clausola PARTITION BY
accetta anche espressioni arbitrarie basate sull'input tablecolumns. Ad esempio, è possibile specificare il LENGTH
di una stringa, estrarre un mese da una data o concatenare due values.
È anche possibile specificare WITH SINGLE PARTITION
anziché PARTITION BY
per richiedere una sola partition in cui tutte le righe di input devono essere utilizzate da una sola istanza della classe UDTF.
All'interno di ogni partition, è possibile specificare facoltativamente un ordinamento richiesto delle righe di input, mentre il metodo eval
dell'UDTF le elabora. A tale scopo, specificare una clausola ORDER BY
dopo la clausola PARTITION BY
o WITH SINGLE PARTITION
descritta in precedenza.
Si consideri, ad esempio, il seguente UDTF:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
È possibile specificare le opzioni di partizionamento quando si chiama la funzione di tabella definita dall'utente sull'input table in diversi modi.
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
Specificare un partizionamento delle righe di input dal metodo analyze
Si noti che per ciascuno dei modi precedenti di partizionamento dell’input table durante la chiamata di funzioni definite dall'utente nelle query SQL, esiste un modo corrispondente affinché il metodo analyze
della funzione definita dall'utente specifichi automaticamente lo stesso metodo di partizionamento.
- Anziché chiamare un tipo definito dall'utente come
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
, è possibile update il metodoanalyze
per set il campopartitionBy=[PartitioningColumn("a")]
e chiamare semplicemente la funzione usandoSELECT * FROM udtf(TABLE(t))
. - Con lo stesso token, invece di specificare
TABLE(t) WITH SINGLE PARTITION ORDER BY b
nella query SQL, è possibile creareanalyze
set i campiwithSinglePartition=true
eorderBy=[OrderingColumn("b")]
e quindi passare semplicementeTABLE(t)
. - Anziché passare
TABLE(SELECT a FROM t)
nella query SQL, è possibile creareanalyze
setselect=[SelectedColumn("a")]
e quindi passare soloTABLE(t)
.
Nell'esempio seguente, analyze
restituisce un output costante schema, seleziona un sottoinsieme di columns dall'input tablee specifica che l'input table viene suddiviso in diverse chiamate UDTF in base al values del date
column:
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])