ユーザー定義のスカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。
Databricks Runtime 14.0 以降では、Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。Python ユーザー定義テーブル関数 (UDTF)を参照してください。
Databricks Runtime12.2LTS 以前では、標準アクセスモードを使用するUnity Catalogコンピュートでは、Python UDF と Pandas UDF はサポートされていません。Scalar Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。
Databricks Runtime13.3LTS Python以降では、Unity Catalog SQL構文を使用して するスカラー UDF を登録できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
Unity Catalog 対応クラスターでのPython UDF のGraviton インスタンス サポートは、Databricks Runtime 15.2 以降で使用できます。
UDF とUDAFは、Databricks Runtime15.2 以降の標準アクセス モードとUnity Catalog で構成されたGravitonクラスターでサポートされています。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
オプションで、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
UDF と DataFrames の併用
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
または、アノテーション構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、
部分式の評価。 特に、演算子または関数の入力はそうではありません
必ず左から右に評価するか、その他の固定された順序で評価されます。 たとえば、論理 AND
また、 OR
式には、左から右への "短絡" セマンティクスはありません。
そのため、Booleanの副作用や評価の順番に頼るのは危険です
式、および WHERE
節と HAVING
節の順序は、
クエリの最適化と計画中に並べ替えられます。 具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、
UDF を呼び出す前にヌル チェックが行われることを保証します。 例えば
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句は、null をフィルターで除外した後に strlen
UDF が呼び出されることを保証するものではありません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
- UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
IF
式またはCASE WHEN
式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
タスク実行コンテキストを取得する
TaskContext PySpark API を使用して、ユーザーの ID、クラスタータグ、spark ジョブ ID などのコンテキスト情報を取得します。 UDF でタスク コンテキストを取得するを参照してください。
制限
PySpark UDF には、次の制限が適用されます。
-
モジュールのインポート制限: PySpark 標準アクセスGit モード クラスターとサーバレス コンピュートの UDFUnity Catalog Databricks Runtimeは、 14.2 以前でモジュールをインポートするために、 フォルダ、ワークスペース ファイル、または ボリュームにアクセスできません。
-
ブロードキャスト変数: 標準アクセス モード クラスター と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。
-
インスタンスプロファイル: 標準アクセスモードの PySpark UDF クラスターとサーバレス コンピュートはインスタンスプロファイルをサポートしていません。
-
メモリ制限 : サーバレス コンピュート上の PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、次のエラーが発生します。
[UDF_PYSPARK_ERROR.OOM] Python worker exited unexpectedly (crashed) due to running out of memory.