다음을 통해 공유


Python UDF(사용자 정의 table 함수)

중요하다

이 기능은 Databricks Runtime 14.3 LTS 이상에서 공개 미리보기 상태입니다.

UDTF(사용자 정의 table 함수)를 사용하면 스칼라 values대신 tables 반환하는 함수를 등록할 수 있습니다. 각 호출에서 단일 결과 값을 반환하는 스칼라 함수와 달리 각 UDTF는 SQL 문의 FROM 절에서 호출되고 전체 table 출력으로 반환합니다.

각 UDTF 호출은 0개 이상의 인수를 수락할 수 있습니다. 이러한 인수는 스칼라 식이거나 전체 입력 tables나타내는 table 인수일 수 있습니다.

기본 UDTF 구문

Apache Spark는 Python UDTF를 Python 클래스 및 필수 eval 메서드로 구현하며, 이 메서드는 yield을 사용하여 출력 행을 내보냅니다.

클래스를 UDTF로 사용하려면 PySpark udtf 함수를 가져와야 합니다. Databricks는 이 함수를 데코레이터로 사용하고 returnType 옵션을 사용하여 필드 이름과 형식을 명시적으로 지정하는 것이 좋습니다(클래스가 이후 섹션에서 설명한 대로 analyze 메서드를 정의하지 않는 한).

다음 UDTF는 두 개의 정수 인수를 가진 고정 list을 사용하여 table를 생성합니다.

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

UDTF를 등록하기

UDTF는 로컬 SparkSession에 등록되며 노트북 또는 작업 수준에서 격리됩니다.

Unity Catalog에서 UDTF를 개체로 등록할 수 없으며 SQL 데이터 웨어하우스에서 UDTF를 사용할 수 없습니다.

현재 SparkSession에 SQL 쿼리에서 사용할 UDTF를 함수 spark.udtf.register()을 사용하여 등록할 수 있습니다. SQL 함수 및 Python UDTF 클래스의 이름을 제공합니다.

spark.udtf.register("get_sum_diff", GetSumDiff)

등록된 UDTF 호출

등록되면 %sql 매직 명령 또는 spark.sql() 함수를 사용하여 SQL에서 UDTF를 사용할 수 있습니다.

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);

Apache 화살표 사용

UDTF가 소량의 데이터를 입력으로 수신하지만 큰 table출력하는 경우 Databricks는 Apache Arrow를 사용하는 것이 좋습니다. UDTF를 선언할 때 useArrow 매개 변수를 지정하여 사용하도록 설정할 수 있습니다.

@udtf(returnType="c1: int, c2: int", useArrow=True)

변수 인수 목록 - *args 및 **kwargs

Python *args이나 **kwargs 구문을 사용하여 지정되지 않은 수의 입력 values을 처리하는 논리를 구현할 수 있습니다.

다음 예제에서는 인수의 입력 길이 및 형식을 명시적으로 확인하면서 동일한 결과를 반환합니다.

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

다음은 동일한 예제이지만 키워드 인수를 사용하는 예제입니다.

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

등록할 때 정적 schema을 정의합니다.

UDTF는 column 이름 및 형식의 정렬된 시퀀스로 구성된 출력 schema이 있는 행을 반환합니다. UDTF schema가 모든 쿼리에 대해 항상 동일하게 유지되어야 하는 경우, @udtf 데코레이터 뒤에 정적이고 고정된 schema을 지정할 수 있습니다. StructType중 하나여야 합니다.

StructType().add("c1", StringType())

또는 구조체 형식을 나타내는 DDL 문자열입니다.

c1: string

함수 호출 시 동적 schema를 계산합니다.

UDTF는 입력 인수의 values에 따라 각 호출에 대해 프로그램적으로 출력 schema를 계산할 수도 있습니다. 이렇게 하려면 특정 UDTF 호출에 제공된 인수에 해당하는 0개 이상의 parameters 허용하는 analyze라는 정적 메서드를 정의합니다.

analyze 메서드의 각 인수는 다음 필드를 포함하는 AnalyzeArgument 클래스의 인스턴스입니다.

AnalyzeArgument 클래스 필드 설명
dataType 입력 인수의 형식은 DataType입니다. 입력 table 인수의 경우, 이것은 table의 columns를 표현하는 StructType입니다.
value 입력 인수의 값은 Optional[Any]입니다. 이것은 상수가 아닌 table 인수 또는 리터럴 스칼라 인수의 경우에 대한 None입니다.
isTable 입력 인수가 BooleanType형식의 table인지 여부입니다.
isConstantExpression 입력 인수가 BooleanType와 같이 상수로 접힐 수 있는 표현식인지 여부입니다.

analyze 메서드는 결과 tableschemaStructType 및 일부 선택적 필드를 포함하는 AnalyzeResult 클래스의 인스턴스를 반환합니다. UDTF가 입력 table 인수를 수락하는 경우 AnalyzeResult 나중에 설명한 대로 여러 UDTF 호출에서 입력 table 행을 partition 순서를 지정하는 요청된 방법을 포함할 수도 있습니다.

AnalyzeResult 클래스 필드 묘사
schema 결과 schema의 table를 StructType로 한 것이다.
withSinglePartition 입력된 모든 행을 같은 BooleanTypeUDTF 클래스 인스턴스로 보낼지 여부입니다.
partitionBy set가 비어있지 않은 경우, 분할 식의 values 고유한 조합이 포함된 모든 행은 각각 별도의 UDTF 클래스 인스턴스에서 처리됩니다.
orderBy set이 비어있지 않은 경우, 각 partition내의 행 순서를 지정합니다.
select 비어있지 않은 set의 경우에는, 이것은 UDTF가 Catalyst로 하여금 입력 TABLE 인수의 columns을 평가하도록 지정하는 표현들의 시퀀스입니다. UDTF는 나열된 순서대로 list 각 이름에 대해 하나의 입력 특성을 받습니다.

analyze 예제에서는 입력 문자열 인수의 각 단어에 대해 하나의 출력 column 반환합니다.

@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result
['word_0', 'word_1']

상태를 향후 eval 호출로 전달

analyze 메서드는 초기화를 수행한 다음 결과를 동일한 UDTF 호출에 대한 향후 eval 메서드 호출로 전달하는 편리한 위치로 사용될 수 있습니다.

이렇게 하려면 AnalyzeResult 서브클래스를 만들고 analyze 메서드에서 서브클래스의 인스턴스를 반환합니다. 그런 다음 __init__ 메서드에 인수를 추가하여 해당 인스턴스를 수락합니다.

analyze 예제에서는 상수 출력 schema반환하지만 이후 __init__ 메서드 호출에서 사용할 결과 메타데이터에 사용자 지정 정보를 추가합니다.

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

출력 행 생성

eval 메서드는 입력 table 인수의 각 행에 대해 한 번 실행되며(또는 table 인수가 제공되지 않은 경우 한 번만), 끝에 terminate 메서드를 한 번 호출합니다. 두 메서드는 튜플, 목록 또는 pyspark.sql.Row 개체를 생성하여 결과 schema에 부합하는 0개 이상의 행을 생성합니다.

이 예제에서는 다음 세 가지 요소의 튜플을 제공하여 행을 반환합니다.

def eval(self, x, y, z):
  yield (x, y, z)

괄호를 생략할 수도 있습니다.

def eval(self, x, y, z):
  yield x, y, z

column하나만 있는 행을 반환하기 위해 후행 쉼표를 추가하십시오.

def eval(self, x, y, z):
  yield x,

pyspark.sql.Row 개체를 생성할 수도 있습니다.

def eval(self, x, y, z)
  from pyspark.sql.types import Row
  yield Row(x, y, z)

이 예제에서는 Python list사용하여 terminate 메서드에서 출력 행을 생성합니다. 이 목적을 위해 UDTF 평가의 이전 단계에서 클래스 내에 상태를 저장할 수 있습니다.

def terminate(self):
  yield [self.x, self.y, self.z]

UDTF에 스칼라 인수 전달

스칼라 인수를 리터럴 values 또는 이를 기반으로 하는 함수로 구성된 상수 표현식으로 UDTF에 전달할 수 있습니다. 예를 들어:

SELECT * FROM udtf(42, group => upper("finance_department"));

UDTF에 table 인수 전달

Python UDTF는 스칼라 입력 인수 외에도 입력 table을 인수로 받을 수 있습니다. 단일 UDTF는 table 인수와 여러 스칼라 인수를 수락할 수도 있습니다.

그런 다음 모든 SQL 쿼리는 TABLE 키워드와 TABLE(t)같은 적절한 tableidentifier둘러싼 괄호를 사용하여 입력 table 제공할 수 있습니다. 또는 TABLE(SELECT a, b, c FROM t) 또는 TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))같은 table 하위 쿼리를 전달할 수 있습니다.

입력 table 인수는 pyspark.sql.Row 인수로 eval 메서드에 표시된 후, 입력 table의 각 행에 대해 eval 메서드를 한 번 호출합니다. 표준 PySpark column 필드 주석을 사용하여 각 행의 columns과 상호 작용할 수 있습니다. 다음 예제에서는 PySpark Row 형식을 명시적으로 가져온 다음 id 필드에서 전달된 table 필터링하는 방법을 보여 줍니다.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

함수를 쿼리하려면 TABLE SQL 키워드를 사용합니다.

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

함수 호출에서 입력 행 분할 지정

table 인수를 사용하여 UDTF를 호출할 때, SQL 쿼리는 한 개 이상의 입력 tablecolumns의 values 에 따라 여러 UDTF 호출에 걸쳐 입력 table 를 partition 수 있습니다.

partition지정하려면 TABLE 인수 뒤의 함수 호출에서 PARTITION BY 절을 사용합니다. 이것은 분할 columns의 values의 고유한 조합 각각에 대해 모든 입력 행이 정확히 하나의 UDTF 클래스 인스턴스에 의해 get 사용되도록 보장합니다.

간단한 column 참조 외에도 PARTITION BY 절은 입력 tablecolumns따라 임의의 식을 허용합니다. 예를 들어 문자열의 LENGTH 지정하거나, 날짜에서 한 달을 추출하거나, 두 개의 values연결할 수 있습니다.

PARTITION BY 대신 WITH SINGLE PARTITION를 지정하여 모든 입력 행이 정확히 하나의 UDTF 클래스 인스턴스에 의해 처리되어야 하는 partition 하나만 요청할 수도 있습니다.

partition내에서 UDTF의 eval 메서드가 입력 행을 처리할 때, 해당 입력 행의 필수 순서를 선택적으로 지정할 수 있습니다. 이렇게 하려면 위에서 설명한 PARTITION BY 또는 WITH SINGLE PARTITION 절 다음에 ORDER BY 절을 제공합니다.

예를 들어 다음 UDTF를 고려합니다.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

입력 table UDTF를 호출할 때 여러 가지 방법으로 분할 옵션을 지정할 수 있습니다.

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

analyze 메서드에서 입력 행 분할 지정

SQL 쿼리에서 UDTF를 호출할 때 입력 table의 분할 방법 중 각 방법에 대해, UDTF의 analyze 메서드가 같은 분할 방법을 자동으로 지정할 수 있는 방법이 있습니다.

  • UDTF를 SELECT * FROM udtf(TABLE(t) PARTITION BY a)으로 호출하는 대신 analyze 메서드를 update하여 필드 partitionBy=[PartitioningColumn("a")]를 set하고 SELECT * FROM udtf(TABLE(t))를 사용하여 함수를 호출할 수 있습니다.
  • 마찬가지로, SQL 쿼리에서 TABLE(t) WITH SINGLE PARTITION ORDER BY b를 지정하는 대신 analyzeset를 withSinglePartition=trueorderBy=[OrderingColumn("b")] 필드로 만들고, 그 후에 TABLE(t)를 전달할 수 있습니다.
  • SQL 쿼리에서 TABLE(SELECT a FROM t) 전달하는 대신 analyzesetselect=[SelectedColumn("a")] 한 다음 TABLE(t)전달할 수 있습니다.

다음 예제에서 analyze 상수 출력 schema반환하고, 입력 tablecolumns 하위 집합을 선택하고, 입력 tabledatecolumnvalues 따라 여러 UDTF 호출에서 분할되도록 지정합니다.

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add('longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word),
        alias="length_word")])