ユーザー定義のスカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。
Databricks Runtime 14.0 以降では、Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。Python ユーザー定義テーブル関数 (UDTF)を参照してください。
Databricks Runtime12.2LTS 以前では、標準アクセスPython PandasUnity Catalogモードを使用するコンピュート UDF と UDF はサポートされていません。Scalar Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。
Databricks Runtime13.3LTS Python以降では、Unity Catalog SQL構文を使用して するスカラー UDF を登録できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
GravitonPythonUnity Catalog 対応クラスターでの UDF の インスタンス サポートは、Databricks Runtime 15.2 以降で使用できます。
UDF とUDAFs Gravitonは、Unity Catalog Databricks Runtime15.2 以降の標準アクセス モードと で構成された クラスターでサポートされています。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
オプションで、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
UDF と データフレーム の併用
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
または、アノテーション構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、
部分式の評価。 特に、演算子または関数の入力はそうではありません
必ず左から右に評価するか、その他の固定された順序で評価されます。 たとえば、論理 AND
また、 OR
式には、左から右への "短絡" セマンティクスはありません。
そのため、Booleanの副作用や評価の順番に頼るのは危険です
式、および WHERE
節と HAVING
節の順序は、
クエリの最適化と計画中に並べ替えられます。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、
UDF を呼び出す前にヌル チェックが行われることを保証します。 例えば
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句は、null をフィルターで除外した後に strlen
UDF が呼び出されることを保証するものではありません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
- UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
IF
式またはCASE WHEN
式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
制限
PySpark UDF には、次の制限が適用されます。
-
モジュールのインポート制限: PySpark 標準アクセスGit モード クラスターとサーバレス コンピュートの UDFUnity Catalog Databricks Runtimeは、 14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。
-
ブロードキャスト変数: 標準アクセス モード クラスター と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。
-
インスタンスプロファイル: 標準アクセスモードの PySpark UDF クラスターとサーバレス コンピュートはインスタンスプロファイルをサポートしていません。
-
メモリ制限 : サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.