メインコンテンツまでスキップ

udtf

ユーザー定義テーブル関数 (UDTF) を作成します。

構文

Python
import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
def eval(self, *args):
# function body
yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

パラメーター

パラメーター

Type

説明

cls

class

オプション。Python ユーザー定義テーブル関数ハンドラー クラス。

returnType

pyspark.sql.types.StructType または str

オプション。ユーザー定義テーブル関数の戻り値の型。値は、StructType オブジェクトまたは DDL 形式の構造体型文字列のいずれかになります。None の場合、ハンドラー クラスはanalyze静的メソッドを提供する必要があります。

useArrow

bool

オプション。Arrow を使用して (デ) シリアル化を最適化するかどうか。None に設定すると、Spark 構成「spark.sql.execution.pythonUDTF.arrow.enabled」が使用されます。

例 1 : 基本的な UDTF 実装。

Python
from pyspark.sql.functions import udtf

class TestUDTF:
def eval(self, *args):
yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
Output
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+

例 2 : デコレータ構文を使用した UDTF。

Python
from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1

PlusOne(lit(1)).show()
Output
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+

例 3 : 静的メソッドを分析する UDTF。

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

def eval(self, a, b):
yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
Output
+---+---+
| a| b|
+---+---+
| 1| x|
+---+---+

例 4 : キーワード引数を使用した UDTF。

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)

def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
Output
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+

例 5 : UDTF が登録され、SQL 経由で呼び出されます。

Python
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)

def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
Output
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+

例 6 : Arrow 最適化が有効になっている UDTF。

Python
from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
def eval(self, x: int):
yield x, x + 1

ArrowPlusOne(lit(1)).show()
Output
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+

例 7 : 決定論的な UDTF を作成する。

Python
from pyspark.sql.functions import udtf

class PlusOne:
def eval(self, a: int):
yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()