ユーザー定義関数 (UDF) とは何ですか?
ユーザー定義関数 (UDF) を使用すると、Databricks の組み込み機能を拡張するコードを再利用および共有できます。UDFs を使用して、複雑な計算、変換、カスタムデータ操作などの特定のタスクを実行します。
UDF と Apache Spark 関数をいつ使用するか
UDFs は、組み込みの Apache Spark 関数では表現しにくいロジックに使用します。組み込みの Apache Spark 関数は、分散処理用に最適化されており、大規模でより優れたパフォーマンスを提供します。詳細については、「 関数」を参照してください。
Databricks では、アドホック クエリ、手動データクレンジング、探索的データ分析、および小規模から中規模のデータセットに対する操作に UDF を推奨しています。 UDF の一般的なユースケースには、データの暗号化、復号化、ハッシュ化、JSON 解析、検証などがあります。
Apache Spark メソッドは、非常に大規模なデータセットに対する操作や、ETL ジョブやストリーミング操作など、定期的または継続的に実行されるワークロードに対して使用します。
UDF タイプを理解する
次のタブから UDF タイプを選択すると、説明、例、および詳細を確認するためのリンクが表示されます。
- Scalar UDF
- Batch Scalar UDFs
- Non-Scalar UDFs
- UDAF
- UDTFs
スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。これらは、Unity Catalog で管理することも、セッション スコープにすることもできます。
次の例では、スカラー UDF を使用して、 name
列の各名前の長さを計算し、新しい列 name_length
に値を追加します。
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
これを PySpark を使用して Databricks ノートブックに実装するには、次のようにします。
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
「Unity Catalog のユーザー定義関数 (UDF)」と「ユーザー定義スカラー関数 - Python」を参照してください。
1:1の入力/出力行パリティを維持しながら、データをバッチで処理します。これにより、大規模なデータ処理のための行ごとの操作のオーバーヘッドが削減されます。また、バッチ UDF は、バッチ間の状態を維持して、より効率的に実行し、リソースを再利用し、データ チャンク間のコンテキストを必要とする複雑な計算を処理します。
これらは、Unity Catalog で管理することも、セッション スコープにすることもできます。
次のバッチ Unity Catalog Python UDF は、行のバッチの処理中に BMI を計算します。
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Unity Catalogのユーザー定義関数 (UDF) およびバッチ Python Unity Catalogのユーザー定義関数 (UDFs)を参照してください。
非スカラー UDF は、柔軟な入力/出力比 (1 または多く).
セッション スコープのバッチ Pandas UDF には、次の種類があります。
- シリーズからシリーズ
- シリーズのイテレータからシリーズのイテレータへ
- 複数のシリーズのイテレータからシリーズのイテレータへ
- 直列からスカラーへ
次に、シリーズ間 Pandas UDFの例を示します。
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
ユーザー定義関数Pandasを参照してください。
UDAFs 複数の行を操作し、1 つの集計結果を返します。 UDAFs はセッション スコープのみです。
次の UDA の例では、名前の長さでスコアを集計します。
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
PythonにおけるPandasユーザー定義関数およびのユーザー定義集計関数 - Scalaを参照してください。
UDTF は 1 つ以上の入力引数を受け取り、各入力行に対して複数の行 (場合によっては複数の列) を返します。UDTFはセッションスコープのみです。
次の例では、score 列の各値がカテゴリのリストに対応しています。UDTF は、コンマ区切りのリストを複数の行に分割します。
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Python ユーザー定義テーブル関数 (UDTF)を参照してください。
Unity Catalog のガバナンスとセッション スコープの UDFs
Unity Catalog Python UDFs とバッチ Unity Catalog Python UDFs は、ガバナンス、再利用、検出可能性を向上させるために、Unity Catalog に保持されます。他のすべての UDF はセッションベースであり、ノートブックまたはジョブで定義され、現在の SparkSession にスコープが設定されます。セッションスコープの UDF を定義してアクセスするには、Scala または Python を使用します。
Unity Catalog で管理された UDF のチート シート
Unity Catalog で管理される UDF を使用すると、カスタム関数を定義、使用、安全に共有、およびコンピューティング環境間で管理できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
UDFタイプ | 対応コンピュート | 説明 |
---|---|---|
Unity Catalog Python UDF |
| UDFPythonでUnity Catalog を定義し、ガバナンスのために に登録する。 スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 |
バッチ Unity Catalog Python UDF |
| UDFPythonでUnity Catalog を定義し、ガバナンスのために に登録する。 複数の値に対するバッチ操作を行い、複数の値を返します。大規模なデータ処理のための行ごとの操作のオーバーヘッドを削減します。 |
セッション スコープ UDFs ユーザー分離コンピュートのチート シート
セッション スコープの UDF は、ノートブックまたはジョブで定義され、現在の SparkSession にスコープが設定されます。セッションスコープの UDF を定義してアクセスするには、Scala または Python を使用します。
UDFタイプ | Databricks Runtimeのバージョン | 対応コンピュート | 説明 |
---|---|---|---|
Python スカラー |
| スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 | |
Pandas UDF(ベクトル化) |
| Pandas UDF は Apache Arrow を使用してデータを転送し、 Pandas データを操作します。 Pandas UDFs は、行ごとのスカラー UDFs.length よりもパフォーマンスを大幅に向上させることができるベクトル化された操作をサポートしています。 | |
Python の UDTF |
| UDTF は 1 つ以上の入力引数を受け取り、各入力行に対して複数の行 (場合によっては複数の列) を返します。 | |
Scala スカラー UDFs |
| スカラー UDF は 1 つの行で動作し、各行に対して 1 つの結果値を返します。 | |
Scala UDAFs |
| UDAFs 複数の行を操作し、1 つの集計結果を返します。 |
パフォーマンスに関する考慮事項
-
Scala UDFs は一般的に Python UDFs よりも高速です。
- 分離されていない Scala UDF は Java 仮想マシン (JVM) で実行されるため、JVM との間でデータを移動するオーバーヘッドを回避できます。
- 分離された Scala UDF は、JVM との間でデータを移動する必要がありますが、メモリをより効率的に処理するため、Python UDF よりも高速になる可能性があります。
-
PythonUDF とPandas UDF Scalaは、データをシリアル化し、JVM からPython インタープリターに移動する必要があるため、 UDF よりも遅くなる傾向があります。
- Pandas UDFs は、Apache Arrow を使用してシリアル化コストを削減するため、Python UDF よりも最大 100 倍高速です。