ユーザー定義のスカラー関数 - Python

この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。

Databricks Runtime 14.0 以降では、 Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。 Python ユーザー定義テーブル関数 (UDTF)を参照してください。

Databricks Runtime12.2LTS 以前では、共有アクセスPython PandasUnity Catalogモードを使用するコンピュート UDF と UDF はサポートされていません。Scalar Python UDFs と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。

Databricks Runtime13.3LTS 以降では、 構文を使用してスカラーPython UDF を に登録できます。Unity CatalogSQLUnity Catalogのユーザー定義関数 (UDF)を参照してください。

Unity Catalog 対応クラスター上の Python UDF に対する Graviton インスタンスのサポートは、Databricks Runtime 15.2 以降で利用できます。

重要

UDFUDAFs GravitonとUnity Catalog Databricks Runtime15.2 以降で共有アクセス モードと が構成された クラスターでサポートされます。

関数を UDF として登録する

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

必要に応じて、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringTypeです。

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL で UDF を呼び出す

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

DataFrames で UDF を使用する

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

または、注釈構文を使用して同じ UDF を宣言することもできます。

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

評価順序と null チェック

Spark SQL (SQL および DataFrame および DataFrame および DataDataset API を含む) は、部分式の評価順序を保証するものではありません。 特に、演算子または関数の入力は、必ずしも左から右、またはその他の固定された順序で評価されるとは限りません。 たとえば、論理 AND 式と OR 式には、左から右への "短絡" セマンティクスはありません。

したがって、 Boolean 式の副作用や評価順序、 WHERE 句や HAVING 句の順序は、クエリーの最適化や計画時に並べ替えることができるため、危険です。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、UDF を呼び出す前に null チェックが行われる保証はありません。 例えば

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

この WHERE 句は、null を除外した後に呼び出される strlen UDF を保証するものではありません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体をヌル対応にし、UDF 自体の内部でヌル チェックを行う

  • IF 式または CASE WHEN 式を使用して null チェックを実行し、条件分岐で UDF を呼び出す

spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

制限事項

PySpark UDF には、次の制限が適用されます。

  • モジュールのインポート制限:PySpark 共有クラスタリングおよびサーバレス コンピュートの UDFGit は、Unity Catalog Databricks Runtime14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。

  • ブロードキャスト変数: 共有クラスタリングおよびサーバレス コンピュート上の PySpark UDF は、ブロードキャスト変数をサポートしていません。

  • インスタンスプロファイル: 共有クラスタリングおよびサーバレス コンピュートの PySpark UDFs は、インスタンスプロファイルをサポートしていません。

  • メモリ制限: サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。 [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.