ユーザー定義のスカラー関数 - Python
この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。
Databricks Runtime 14.0 以降では、Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。Python ユーザー定義テーブル関数 (UDTF)を参照してください。
Databricks Runtime14.0 以前では、標準アクセスPython PandasUnity Catalogモードを使用する クラスターでは、 UDF と UDF はサポートされていません。スカラー Python UDF と Pandas UDF は、Databricks Runtime 14.1 以降のすべてのアクセス モードでサポートされています。
Databricks Runtime14.1 以降では、SQL 構文を使用してUnity CatalogにスカラーPython UDFを登録できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。
関数を UDF として登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
オプションで、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringType
です。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL で UDF を呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
UDF と DataFrames の併用
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
または、アノテーション構文を使用して同じ UDF を宣言することもできます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序と null チェック
Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、
部分式の評価。 特に、演算子または関数の入力はそうではありません
必ず左から右に評価するか、その他の固定された順序で評価されます。 たとえば、論理 AND
また、 OR
式には、左から右への "短絡" セマンティクスはありません。
そのため、Booleanの副作用や評価の順番に頼るのは危険です
式、および WHERE
節と HAVING
節の順序は、
クエリの最適化と計画中に並べ替えられます。具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、
UDF を呼び出す前にヌル チェックが行われることを保証します。例えば
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
この WHERE
句は、null をフィルターで除外した後に strlen
UDF が呼び出されることを保証するものではありません。
適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。
- UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
IF
式またはCASE WHEN
式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
スカラー Python UDF のサービス資格情報
スカラー Python UDF は、 Unity Catalog サービス資格情報を使用して、外部クラウドサービスに安全にアクセスできます。 これは、クラウドベースのトークン化、暗号化、シークレット管理などの操作をデータ変換に直接統合する場合に便利です。
スカラー Python UDF のサービス資格情報は、ウェアハウスと一般的なコンピュート SQLでのみサポートされています。
サービス資格情報を作成するには、「 サービス資格情報の作成」を参照してください。
サービス認証情報にアクセスするには、UDF ロジックの databricks.service_credentials.getServiceCredentialsProvider()
ユーティリティを使用して、適切な認証情報でクラウド SDK を初期化します。すべてのコードは UDF 本体にカプセル化する必要があります。
@udf
def use_service_credential():
from google.cloud import storage
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
client = storage.Client(project='your-project', credentials=getServiceCredentialsProvider('testcred'))
# Use the client to perform operations
サービス資格情報のアクセス許可
UDF の作成者は、Unity Catalog サービス資格情報に対する ACCESS アクセス許可を持っている必要があります。
No-PE スコープで実行される UDF (専用クラスタリングとも呼ばれます) には、サービス資格情報に対する MANAGE 権限が必要です。
デフォルトの資格情報
スカラー Python UDF で使用すると、 Databricks はコンピュート 環境変数のデフォルト サービス 資格情報を自動的に使用します。 この動作により、UDF コードで資格証明エイリアスを明示的に管理しなくても、外部サービスを安全に参照できます。「コンピュート・リソースのデフォルト・サービス資格証明の指定」を参照してください
デフォルト資格情報のサポートは、標準および専用アクセスモードのクラスタリングでのみ使用できます。 DBSQLでは使用できません。
@udf
def use_service_credential():
import google.auth # this import is always needed to trigger the SDK monkeypatching
from google.cloud import storage
# the client automatically uses the default credential
client = storage.Client(project='your-project')
# you can also get the token explicitly from the default provider
from google.auth import default
credentials, _ = default()
token = credentials.token
# Use the client to perform operations
タスク実行コンテキストを取得する
TaskContext PySpark API を使用して、ユーザーの ID、クラスタータグ、spark ジョブ ID などのコンテキスト情報を取得します。 UDF でタスク コンテキストを取得するを参照してください。
制限
PySpark UDF には、次の制限が適用されます。
-
ファイルアクセス制限: Databricks Runtime 14.2 以前では、共有クラスタリング上の PySpark UDF は、Git フォルダ、ワークスペース ファイル、または Unity Catalog ボリュームにアクセスできません。
-
ブロードキャスト変数: 標準アクセス モード クラスター と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。
-
サービス資格情報: サービス資格情報は、バッチ Unity Catalog Python UDF とスカラー Python UDF でのみ使用できます。これらは、標準の Unity Catalog Python UDF ではサポートされていません。
-
サービス資格情報 : サービス資格情報は、専用コンピュートではサポートされていません。
-
サービス認証情報 : サービス認証情報は、サーバレス環境バージョン 3 以降を使用している場合にのみ、サーバレスコンピュートで使用できます。 サーバレス環境バージョンを参照してください。
-
サーバレスのメモリ制限 : サーバレス コンピュートの PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、タイプ UDF_PYSPARK_USER_CODE_ERROR のエラーが発生します 。MEMORY_LIMIT_SERVERLESS。
-
標準アクセスモードのメモリ制限 : 標準アクセスモードの PySpark UDF には、選択したインスタンスタイプの使用可能なメモリに基づいてメモリ制限があります。使用可能なメモリを超えると、タイプ UDF_PYSPARK_USER_CODE_ERROR のエラーが発生します 。MEMORY_LIMIT。