メインコンテンツまでスキップ

ユーザー定義のスカラー関数 - Python

この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。

Databricks Runtime 14.0 以降では、Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。Python ユーザー定義テーブル関数 (UDTF)を参照してください。

注記

Databricks Runtime12.2LTS 以前では、標準アクセスPython PandasUnity Catalogモードを使用するコンピュート UDF と UDF はサポートされていません。Scalar Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。

Databricks Runtime13.3LTS Python以降では、Unity Catalog SQL構文を使用して するスカラー UDF を登録できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

GravitonPythonUnity Catalog 対応クラスターでの UDF の インスタンス サポートは、Databricks Runtime 15.2 以降で使用できます。

important

UDF とUDAFs Gravitonは、Unity Catalog Databricks Runtime15.2 以降の標準アクセス モードと で構成された クラスターでサポートされています。

関数を UDF として登録する

Python
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)

オプションで、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringTypeです。

Python
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL で UDF を呼び出す

Python
spark.range(1, 20).createOrReplaceTempView("test")
SQL
%sql select id, squaredWithPython(id) as id_squared from test

UDF と データフレーム の併用

Python
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

または、アノテーション構文を使用して同じ UDF を宣言することもできます。

Python
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

評価順序と null チェック

Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、 部分式の評価。 特に、演算子または関数の入力はそうではありません 必ず左から右に評価するか、その他の固定された順序で評価されます。 たとえば、論理 AND また、 OR 式には、左から右への "短絡" セマンティクスはありません。

そのため、Booleanの副作用や評価の順番に頼るのは危険です 式、および WHERE 節と HAVING 節の順序は、 クエリの最適化と計画中に並べ替えられます。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、 UDF を呼び出す前にヌル チェックが行われることを保証します。 例えば

Python
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

この WHERE 句は、null をフィルターで除外した後に strlen UDF が呼び出されることを保証するものではありません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
  • IF式またはCASE WHEN式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
Python
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok

制限

PySpark UDF には、次の制限が適用されます。

  • モジュールのインポート制限: PySpark 標準アクセスGit モード クラスターとサーバレス コンピュートの UDFUnity Catalog Databricks Runtimeは、 14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。

  • ブロードキャスト変数: 標準アクセス モード クラスター と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。

  • インスタンスプロファイル: 標準アクセスモードの PySpark UDF クラスターとサーバレス コンピュートはインスタンスプロファイルをサポートしていません。

  • メモリ制限 : サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。 [UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.