メインコンテンツまでスキップ

Python ユーザー定義テーブル関数 (UDTF)

備考

プレビュー

この機能は、Databricks Runtime 14.3 LTS 以降で パブリック プレビュー 段階です。

ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりにテーブルを返す関数を登録することができます。 各呼び出しから 1 つの結果値を返すスカラー関数とは異なり、各 UDTF は SQL ステートメントの FROM 句で呼び出され、テーブル全体を出力として返します。

各UDTF呼び出しは、0個以上の引数を受け入れることができます。 これらの引数は、スカラー式、または入力テーブル全体を表すテーブル引数にすることができます。

基本的な UDTF 構文

Apache Spark は、Python UDTFs を Python クラスとして実装し、eval``yield を使用して 出力行を出力します。

クラスを UDTF として使用するには、PySpark udtf 関数をインポートする必要があります。 Databricks の推奨事項 この関数をデコレータとして使用し、フィールド名と型を明示的に指定するために returnType オプション (後のセクションで説明するように、クラスが analyze メソッドを定義している場合を除く)。

次の UDTF は、2 つの整数引数の固定リストを使用してテーブルを作成します。

Python
from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
Output
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+

登録する a UDTF

UDTF はローカル SparkSession に登録され、ノートブックまたはジョブ レベルで分離されます。

Unity Catalogでオブジェクトとして登録する UDTF はできず、UDTF を SQLウェアハウスで使用することはできません。

UDTF を現在のSparkSessionに登録して、関数 spark.udtf.register() を使用して SQL クエリで使用できます。 SQL 関数と Python UDTF クラスの名前を指定します。

Python
spark.udtf.register("get_sum_diff", GetSumDiff)

登録済みの UDTF を呼び出す

登録すると、%sql magicコマンドまたはspark.sql()を使用して、 SQL でUDTFを使用できます 機能:

Python
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
Python
%sql
SELECT * FROM get_sum_diff(1,2);

Apache Arrow を使用する

UDTF が入力として少量のデータを受け取り、大きなテーブルを出力する場合、Databricks Apache Arrowの使用を推奨します。 有効にするには、次の場合に useArrow パラメーターを指定します。 UDTFを宣言します。

Python
@udtf(returnType="c1: int, c2: int", useArrow=True)

変数引数リスト - *args と **kwargs

Python *args または **kwargs 構文を使用して、不特定の数の入力値を処理するロジックを実装できます。

次の例では、引数の入力の長さと型を明示的にチェックしながら、同じ結果を返します。

Python
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

これは同じ例ですが、キーワード引数を使用しています:

Python
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

登録時に静的スキーマを定義する

UDTF は、列名の順序付けられたシーケンスで構成される出力スキーマを持つ行を返します。 種類。 UDTF スキーマがすべてのクエリで常に同じままである必要がある場合は、静的、固定を指定できます スキーマを @udtf デコレータの後の 次のいずれかである必要がありますStructType

Python
StructType().add("c1", StringType())

または、構造体タイプを表す DDL 文字列:

c1: string

コンピュート a dynamic schema at function call time

UDTFは、入力引数の値に応じて、呼び出しごとにプログラムで出力スキーマをコンピュートすることもできます。 これを行うには、特定の UDTF 呼び出しに指定された引数に対応する 0 個以上のパラメーターを受け入れる analyze という静的メソッドを定義します。

analyze メソッドの各引数は AnalyzeArgument クラスのインスタンスで、次のフィールドが含まれます。

AnalyzeArgument class フィールド

説明

dataType

入力引数の型を DataTypeとして指定します。 入力テーブルの引数の場合、これはテーブルの列を表す StructType です。

value

入力引数の値は Optional[Any] です。 これは、定数ではないテーブル引数またはリテラルスカラー引数の場合は None です。

isTable

入力引数が BooleanType のようなテーブルであるかどうか。

isConstantExpression

入力引数が定数フォールド可能な式であるかどうか、 BooleanType.

analyze メソッドは、結果テーブルのスキーマをStructTypeとして含む AnalyzeResult クラスのインスタンスと、いくつかのオプションフィールドを返します。UDTF が入力テーブルの引数を受け入れる場合、後で説明するように、 AnalyzeResult には、入力テーブルの行を複数の UDTF 呼び出しに分割して順序付けする要求された方法を含めることもできます。

AnalyzeResult class フィールド

説明

schema

結果テーブルのスキーマを StructTypeとして表示します。

withSinglePartition

すべての入力行を BooleanTypeとして同じUDTFクラスインスタンスに送信するかどうか。

partitionBy

空でない値に設定すると、パーティション化式の値の一意の組み合わせを持つすべての行が、UDTF クラスの個別のインスタンスによって消費されます。

orderBy

空でないに設定すると、各パーティション内の行の順序が指定されます。

select

空でない値に設定した場合、これは UDTF が Catalyst が入力 TABLE 引数の列に対して評価するために指定している一連の式です。 UDTF は、リスト内の名前ごとに 1 つの入力属性を、リストされている順序で受け取ります。

この例では analyze 入力文字列引数の単語ごとに 1 つの出力カラムを返します。

Python
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)

def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
Output
['word_0', 'word_1']

将来の eval 呼び出しに状態を転送する

analyzeメソッドは、初期化を実行し、その結果を同じ UDTF 呼び出しの将来の eval メソッド呼び出しに転送するのに便利な場所として機能します。

これを行うには、 AnalyzeResult のサブクラスを作成し、 analyze メソッドからサブクラスのインスタンスを返します。次に、そのインスタンスを受け入れるための引数を __init__ メソッドに追加します。

この analyze 例では、定数出力スキーマを返しますが、将来の __init__ メソッド呼び出しで使用されるカスタム情報を結果メタデータに追加します。

Python
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""

@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""

@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)

def eval(self, argument, row: Row):
self._total += 1

def terminate(self):
yield self._total, self._buffer

self.spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
Output
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+

出力行の yield

evalメソッドは、入力テーブル引数の各ローに対して1回(またはテーブル引数が指定されていない場合は1回だけ)実行され、最後にterminateメソッドが1回呼び出されます。どちらの方法でも、タプル、リスト、または pyspark.sql.Row オブジェクトを生成することにより、結果スキーマに準拠する 0 行以上の行を出力します。

この例では、次の 3 つの要素のタプルを指定して行を返します。

Python
def eval(self, x, y, z):
yield (x, y, z)

括弧を省略することもできます。

Python
def eval(self, x, y, z):
yield x, y, z

末尾にカンマを追加すると、列が 1 つだけの行が返されます。

Python
def eval(self, x, y, z):
yield x,

また、 pyspark.sql.Row オブジェクトを生成することもできます。

Python
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)

この例では、Python リストを使用して terminate メソッドから出力ローを生成します。 この目的のために、UDTF 評価の前の手順で取得した状態をクラス内に格納できます。

Python
def terminate(self):
yield [self.x, self.y, self.z]

スカラー引数を UDTF に渡す

スカラー引数は、リテラル値またはそれに基づく関数で構成される定数式としてUDTFに渡すことができます。 例えば:

SQL
SELECT * FROM udtf(42, group => upper("finance_department"));

UDTFにテーブル引数を渡す

Python UDTFs は、スカラー入力引数に加えて、入力テーブルを引数として受け入れることができます。 1 つの UDTF は、テーブル引数と複数のスカラー引数を受け入れることもできます。

その後、任意の SQL クエリで、 TABLE キーワードとそれに続く括弧を使用して入力テーブルを提供できます 適切なテーブル識別子 ( TABLE(t). または、テーブルを渡すこともできます サブクエリ ( TABLE(SELECT a, b, c FROM t)TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))

入力テーブルの引数は、evalメソッドのpyspark.sql.Row引数として表されます。 入力テーブルの各行に対して eval メソッドを 1 回呼び出します。標準のPySparkを使用できます 各行の列と対話するための列フィールド注釈。 次の例は、 PySpark Row 型を明示的にインポートし、渡されたテーブルを id フィールドでフィルタリングします。

Python
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

関数をクエリするには、 TABLE SQL キーワードを使用します。

SQL
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
Output
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+

関数呼び出しからの入力行のパーティション分割を指定します

テーブル引数を使用して UDTF を呼び出す場合、任意の SQL クエリで、1 つ以上の入力テーブル列の値に基づいて、入力テーブルを複数の UDTF 呼び出しに分割できます。

パーティションを指定するには、関数呼び出しの TABLE 引数の後に PARTITION BY 句を使用します。これにより、すべての入力行が、 パーティション分割列は、UDTF クラスの 1 つのインスタンスによってのみ使用されます。

単純なカラム参照に加えて、 PARTITION BY 句は任意の 入力テーブル列に基づく式。 たとえば、a の LENGTH を指定できます。 文字列、日付から月を抽出する、または 2 つの値を連結します。

また、リクエストのみにPARTITION BYする代わりにWITH SINGLE PARTITIONを指定することも可能です すべての入力行が UDTF クラスの 1 つのインスタンスによって消費される必要がある 1 つのパーティション。

各パーティション内では、オプションで入力行の必要な順序を次のように指定できます。 UDTF の eval メソッドはそれらを消費します。 これを行うには、 の後に ORDER BY 句を指定します。 上記のPARTITION BY または WITH SINGLE PARTITION 条項。

たとえば、次の UDTF について考えてみます。

Python
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0

def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])

def terminate(self):
yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

入力テーブルでUDTFを呼び出すときに、パーティション分割オプションを複数の方法で指定できます。

SQL
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
Output
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
SQL
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
Output
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
SQL

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
Output
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
SQL
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
Output
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+

analyzeメソッドからの入力行のパーティション分割を指定します

SQL クエリで UDTF を呼び出すときに入力テーブルをパーティション分割する上記の各方法には、UDTF の analyze メソッドで同じパーティション分割方法を自動的に指定するための対応する方法があることに注意してください。

  • UDTF を SELECT * FROM udtf(TABLE(t) PARTITION BY a)として呼び出す代わりに、 analyze メソッドを更新してフィールドを partitionBy=[PartitioningColumn("a")] に設定し、単に SELECT * FROM udtf(TABLE(t))を使用して関数を呼び出すことができます。
  • 同じトークンを使用して、 SQL クエリで TABLE(t) WITH SINGLE PARTITION ORDER BY b を指定する代わりに、フィールド withSinglePartition=trueorderBy=[OrderingColumn("b")] を設定analyzeTABLE(t) を渡すことができます。
  • SQLクエリで TABLE(SELECT a FROM t) を渡す代わりに、 analyze セットを select=[SelectedColumn("a")] にしてから、 TABLE(t)を渡すだけにすることができます。

次の例では、analyze は定数の出力スキーマを返し、入力テーブルから列のサブセットを選択し、date 列の値に基づいて入力テーブルを複数の UDTF 呼び出しに分割することを指定します。

Python
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)

assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])