Skip to main content

udtf

Creates a user defined table function (UDTF).

Syntax

Python
import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
def eval(self, *args):
# function body
yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Parameters

Parameter

Type

Description

cls

class

Optional. The Python user-defined table function handler class.

returnType

pyspark.sql.types.StructType or str

Optional. The return type of the user-defined table function. The value can be either a StructType object or a DDL-formatted struct type string. If None, the handler class must provide analyze static method.

useArrow

bool

Optional. Whether to use Arrow to optimize the (de)serializations. When it's set to None, the Spark config "spark.sql.execution.pythonUDTF.arrow.enabled" is used.

Examples

Example 1: Basic UDTF implementation.

Python
from pyspark.sql.functions import udtf

class TestUDTF:
def eval(self, *args):
yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
Output
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+

Example 2: UDTF using decorator syntax.

Python
from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1

PlusOne(lit(1)).show()
Output
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+

Example 3: UDTF with analyze static method.

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

def eval(self, a, b):
yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
Output
+---+---+
| a| b|
+---+---+
| 1| x|
+---+---+

Example 4: UDTF with keyword arguments.

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)

def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
Output
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+

Example 5: UDTF registered and called via SQL.

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)

def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
Output
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+

Example 6: UDTF with Arrow optimization enabled.

Python
from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
def eval(self, x: int):
yield x, x + 1

ArrowPlusOne(lit(1)).show()
Output
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+

Example 7: Creating a deterministic UDTF.

Python
from pyspark.sql.functions import udtf

class PlusOne:
def eval(self, a: int):
yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()