Python ユーザー定義テーブル関数 (UDTF)
プレビュー
この機能は、Databricks Runtime 14.3 LTS 以降で パブリック プレビュー 段階です。
ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりにテーブルを返す関数を登録することができます。 各呼び出しから 1 つの結果値を返すスカラー関数とは異なり、各 UDTF は SQL ステートメントの FROM
句で呼び出され、テーブル全体を出力として返します。
各UDTF呼び出しは、0個以上の引数を受け入れることができます。 これらの引数は、スカラー式、または入力テーブル全体を表すテーブル引数にすることができます。
基本的な UDTF 構文
Apache Spark は、Python UDTFs を Python クラスとして実装し、eval``yield
を使用して
出力行を出力します。
クラスを UDTF として使用するには、PySpark udtf
関数をインポートする必要があります。 Databricks の推奨事項
この関数をデコレータとして使用し、フィールド名と型を明示的に指定するために
returnType
オプション (後のセクションで説明するように、クラスが analyze
メソッドを定義している場合を除く)。
次の UDTF は、2 つの整数引数の固定リストを使用してテーブルを作成します。
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
| 3| -1|
+----+-----+
登録する a UDTF
UDTF はローカル SparkSession
に登録され、ノートブックまたはジョブ レベルで分離されます。
Unity Catalogでオブジェクトとして登録する UDTF はできず、UDTF を SQLウェアハウスで使用することはできません。
UDTF を現在のSparkSession
に登録して、関数 spark.udtf.register()
を使用して SQL クエリで使用できます。 SQL 関数と Python UDTF クラスの名前を指定します。
spark.udtf.register("get_sum_diff", GetSumDiff)
登録済みの UDTF を呼び出す
登録すると、%sql
magicコマンドまたはspark.sql()
を使用して、 SQL でUDTFを使用できます
機能:
spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);")
%sql
SELECT * FROM get_sum_diff(1,2);
Apache Arrow を使用する
UDTF が入力として少量のデータを受け取り、大きなテーブルを出力する場合、Databricks
Apache Arrowの使用を推奨します。 有効にするには、次の場合に useArrow
パラメーターを指定します。
UDTFを宣言します。
@udtf(returnType="c1: int, c2: int", useArrow=True)
変数引数リスト - *args と **kwargs
Python *args
または **kwargs
構文を使用して、不特定の数の入力値を処理するロジックを実装できます。
次の例では、引数の入力の長さと型を明示的にチェックしながら、同じ結果を返します。
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
これは同じ例ですが、キーワード引数を使用しています:
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, **kwargs):
x = kwargs["x"]
y = kwargs["y"]
yield x + y, x - y
GetSumDiff(x=lit(1), y=lit(2)).show()
登録時に静的スキーマを定義する
UDTF は、列名の順序付けられたシーケンスで構成される出力スキーマを持つ行を返します。
種類。 UDTF スキーマがすべてのクエリで常に同じままである必要がある場合は、静的、固定を指定できます
スキーマを @udtf
デコレータの後の 次のいずれかである必要がありますStructType
StructType().add("c1", StringType())
または、構造体タイプを表す DDL 文字列:
c1: string
コンピュート a dynamic schema at function call time
UDTFは、入力引数の値に応じて、呼び出しごとにプログラムで出力スキーマをコンピュートすることもできます。 これを行うには、特定の UDTF 呼び出しに指定された引数に対応する 0 個以上のパラメーターを受け入れる analyze
という静的メソッドを定義します。
analyze
メソッドの各引数は AnalyzeArgument
クラスのインスタンスで、次のフィールドが含まれます。
| 説明 |
---|---|
| 入力引数の型を |
| 入力引数の値は |
| 入力引数が |
| 入力引数が定数フォールド可能な式であるかどうか、 |
analyze
メソッドは、結果テーブルのスキーマをStructType
として含む AnalyzeResult
クラスのインスタンスと、いくつかのオプションフィールドを返します。UDTF が入力テーブルの引数を受け入れる場合、後で説明するように、 AnalyzeResult
には、入力テーブルの行を複数の UDTF 呼び出しに分割して順序付けする要求された方法を含めることもできます。
| 説明 |
---|---|
| 結果テーブルのスキーマを |
| すべての入力行を |
| 空でない値に設定すると、パーティション化式の値の一意の組み合わせを持つすべての行が、UDTF クラスの個別のインスタンスによって消費されます。 |
| 空でないに設定すると、各パーティション内の行の順序が指定されます。 |
| 空でない値に設定した場合、これは UDTF が Catalyst が入力 TABLE 引数の列に対して評価するために指定している一連の式です。 UDTF は、リスト内の名前ごとに 1 つの入力属性を、リストされている順序で受け取ります。 |
この例では analyze
入力文字列引数の単語ごとに 1 つの出力カラムを返します。
@udtf
class MyUDTF:
@staticmethod
def analyze(text: AnalyzeArgument) -> AnalyzeResult:
schema = StructType()
for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
schema = schema.add(f"word_{index}", IntegerType())
return AnalyzeResult(schema=schema)
def eval(self, text: str):
counts = {}
for word in text.split(" "):
if word not in counts:
counts[word] = 0
counts[word] += 1
result = []
for word in sorted(list(set(text.split(" ")))):
result.append(counts[word])
yield result
['word_0', 'word_1']
将来の eval
呼び出しに状態を転送する
analyze
メソッドは、初期化を実行し、その結果を同じ UDTF 呼び出しの将来の eval
メソッド呼び出しに転送するのに便利な場所として機能します。
これを行うには、 AnalyzeResult
のサブクラスを作成し、 analyze
メソッドからサブクラスのインスタンスを返します。次に、そのインスタンスを受け入れるための引数を __init__
メソッドに追加します。
この analyze
例では、定数出力スキーマを返しますが、将来の __init__
メソッド呼び出しで使用されるカスタム情報を結果メタデータに追加します。
@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
buffer: str = ""
@udtf
class TestUDTF:
def __init__(self, analyze_result=None):
self._total = 0
if analyze_result is not None:
self._buffer = analyze_result.buffer
else:
self._buffer = ""
@staticmethod
def analyze(argument, _) -> AnalyzeResult:
if (
argument.value is None
or argument.isTable
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be a non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
return AnalyzeResultWithBuffer(
schema=StructType()
.add("total", IntegerType())
.add("buffer", StringType()),
withSinglePartition=True,
buffer=argument.value,
)
def eval(self, argument, row: Row):
self._total += 1
def terminate(self):
yield self._total, self._buffer
self.spark.udtf.register("test_udtf", TestUDTF)
spark.sql(
"""
WITH t AS (
SELECT id FROM range(1, 21)
)
SELECT total, buffer
FROM test_udtf("abc", TABLE(t))
"""
).show()
+-------+-------+
| count | buffer|
+-------+-------+
| 20 | "abc"|
+-------+-------+
出力行の yield
eval
メソッドは、入力テーブル引数の各ローに対して1回(またはテーブル引数が指定されていない場合は1回だけ)実行され、最後にterminate
メソッドが1回呼び出されます。どちらの方法でも、タプル、リスト、または pyspark.sql.Row
オブジェクトを生成することにより、結果スキーマに準拠する 0 行以上の行を出力します。
この例では、次の 3 つの要素のタプルを指定して行を返します。
def eval(self, x, y, z):
yield (x, y, z)
括弧を省略することもできます。
def eval(self, x, y, z):
yield x, y, z
末尾にカンマを追加すると、列が 1 つだけの行が返されます。
def eval(self, x, y, z):
yield x,
また、 pyspark.sql.Row
オブジェクトを生成することもできます。
def eval(self, x, y, z)
from pyspark.sql.types import Row
yield Row(x, y, z)
この例では、Python リストを使用して terminate
メソッドから出力ローを生成します。 この目的のために、UDTF 評価の前の手順で取得した状態をクラス内に格納できます。
def terminate(self):
yield [self.x, self.y, self.z]
スカラー引数を UDTF に渡す
スカラー引数は、リテラル値またはそれに基づく関数で構成される定数式としてUDTFに渡すことができます。 例えば:
SELECT * FROM udtf(42, group => upper("finance_department"));
UDTFにテーブル引数を渡す
Python UDTFs は、スカラー入力引数に加えて、入力テーブルを引数として受け入れることができます。 1 つの UDTF は、テーブル引数と複数のスカラー引数を受け入れることもできます。
その後、任意の SQL クエリで、 TABLE
キーワードとそれに続く括弧を使用して入力テーブルを提供できます
適切なテーブル識別子 ( TABLE(t)
. または、テーブルを渡すこともできます
サブクエリ ( TABLE(SELECT a, b, c FROM t)
や
TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key))
。
入力テーブルの引数は、eval
メソッドのpyspark.sql.Row
引数として表されます。
入力テーブルの各行に対して eval
メソッドを 1 回呼び出します。標準のPySparkを使用できます
各行の列と対話するための列フィールド注釈。 次の例は、
PySpark Row
型を明示的にインポートし、渡されたテーブルを id
フィールドでフィルタリングします。
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
関数をクエリするには、 TABLE
SQL キーワードを使用します。
SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
関数呼び出しからの入力行のパーティション分割を指定します
テーブル引数を使用して UDTF を呼び出す場合、任意の SQL クエリで、1 つ以上の入力テーブル列の値に基づいて、入力テーブルを複数の UDTF 呼び出しに分割できます。
パーティションを指定するには、関数呼び出しの TABLE
引数の後に PARTITION BY
句を使用します。これにより、すべての入力行が、
パーティション分割列は、UDTF クラスの 1 つのインスタンスによってのみ使用されます。
単純なカラム参照に加えて、 PARTITION BY
句は任意の
入力テーブル列に基づく式。 たとえば、a の LENGTH
を指定できます。
文字列、日付から月を抽出する、または 2 つの値を連結します。
また、リクエストのみにPARTITION BY
する代わりにWITH SINGLE PARTITION
を指定することも可能です
すべての入力行が UDTF クラスの 1 つのインスタンスによって消費される必要がある 1 つのパーティション。
各パーティション内では、オプションで入力行の必要な順序を次のように指定できます。
UDTF の eval
メソッドはそれらを消費します。 これを行うには、 の後に ORDER BY
句を指定します。
上記のPARTITION BY
または WITH SINGLE PARTITION
条項。
たとえば、次の UDTF について考えてみます。
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="a: string, b: int")
class FilterUDTF:
def __init__(self):
self.key = ""
self.max = 0
def eval(self, row: Row):
self.key = row["a"]
self.max = max(self.max, row["b"])
def terminate(self):
yield self.key, self.max
spark.udtf.register("filter_udtf", FilterUDTF)
入力テーブルでUDTFを呼び出すときに、パーティション分割オプションを複数の方法で指定できます。
-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8)";
SELECT * FROM values_table;
+-------+----+
| a | b |
+-------+----+
| "abc" | 2 |
| "abc" | 4 |
| "def" | 6 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "abc" | 4 |
| "def" | 8 |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
| a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
| a | b |
+-------+----+
| "def" | 8 |
+-------+----+
analyze
メソッドからの入力行のパーティション分割を指定します
SQL クエリで UDTF を呼び出すときに入力テーブルをパーティション分割する上記の各方法には、UDTF の analyze
メソッドで同じパーティション分割方法を自動的に指定するための対応する方法があることに注意してください。
- UDTF を
SELECT * FROM udtf(TABLE(t) PARTITION BY a)
として呼び出す代わりに、analyze
メソッドを更新してフィールドをpartitionBy=[PartitioningColumn("a")]
に設定し、単にSELECT * FROM udtf(TABLE(t))
を使用して関数を呼び出すことができます。 - 同じトークンを使用して、 SQL クエリで
TABLE(t) WITH SINGLE PARTITION ORDER BY b
を指定する代わりに、フィールドwithSinglePartition=true
とorderBy=[OrderingColumn("b")]
を設定analyze
、TABLE(t)
を渡すことができます。 - SQLクエリで
TABLE(SELECT a FROM t)
を渡す代わりに、analyze
セットをselect=[SelectedColumn("a")]
にしてから、TABLE(t)
を渡すだけにすることができます。
次の例では、analyze
は定数の出力スキーマを返し、入力テーブルから列のサブセットを選択し、date
列の値に基づいて入力テーブルを複数の UDTF 呼び出しに分割することを指定します。
@staticmethod
def analyze(*args) -> AnalyzeResult:
"""
The input table will be partitioned across several UDTF calls based on the monthly
values of each `date` column. The rows within each partition will arrive ordered by the `date`
column. The UDTF will only receive the `date` and `word` columns from the input table.
"""
from pyspark.sql.functions import (
AnalyzeResult,
OrderingColumn,
PartitioningColumn,
)
assert len(args) == 1, "This function accepts one argument only"
assert args[0].isTable, "Only table arguments are supported"
return AnalyzeResult(
schema=StructType()
.add("month", DateType())
.add('longest_word", IntegerType()),
partitionBy=[
PartitioningColumn("extract(month from date)")],
orderBy=[
OrderingColumn("date")],
select=[
SelectedColumn("date"),
SelectedColumn(
name="length(word),
alias="length_word")])