メインコンテンツまでスキップ

ユーザー定義のスカラー関数 - Python

この記事には、Python ユーザー定義関数 (UDF) の例が含まれています。 UDF を登録する方法、UDF を呼び出す方法を示し、Spark SQL での部分式の評価順序に関する注意事項を示します。

Databricks Runtime 14.0 以降では、Pythonユーザー定義テーブル関数 (UDTF) を使用して、スカラー値ではなくリレーション全体を返す関数を登録できます。Python ユーザー定義テーブル関数 (UDTF)を参照してください。

注記

Databricks Runtime12.2LTS 以前では、標準アクセスモードを使用するUnity Catalogコンピュートでは、Python UDF と Pandas UDF はサポートされていません。Scalar Python UDF と Pandas UDF は、Databricks Runtime 13.3 LTS 以降ですべてのアクセス モードでサポートされています。

Databricks Runtime13.3LTS Python以降では、Unity Catalog SQL構文を使用して するスカラー UDF を登録できます。「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

Unity Catalog 対応クラスターでのPython UDF のGraviton インスタンス サポートは、Databricks Runtime 15.2 以降で使用できます。

important

UDF とUDAFは、Databricks Runtime15.2 以降の標準アクセス モードとUnity Catalog で構成されたGravitonクラスターでサポートされています。

関数を UDF として登録する

Python
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)

オプションで、UDF の戻り値の型を設定できます。 デフォルトの戻り値の型は StringTypeです。

Python
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQL で UDF を呼び出す

Python
spark.range(1, 20).createOrReplaceTempView("test")
SQL
%sql select id, squaredWithPython(id) as id_squared from test

UDF と DataFrames の併用

Python
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

または、アノテーション構文を使用して同じ UDF を宣言することもできます。

Python
from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

評価順序と null チェック

Spark SQL ( SQL 、 データフレーム 、データセット APIを含む)は、 部分式の評価。 特に、演算子または関数の入力はそうではありません 必ず左から右に評価するか、その他の固定された順序で評価されます。 たとえば、論理 AND また、 OR 式には、左から右への "短絡" セマンティクスはありません。

そのため、Booleanの副作用や評価の順番に頼るのは危険です 式、および WHERE 節と HAVING 節の順序は、 クエリの最適化と計画中に並べ替えられます。具体的には、UDF が null チェックのために SQL の短絡セマンティクスに依存している場合、 UDF を呼び出す前にヌル チェックが行われることを保証します。例えば

Python
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

この WHERE 句は、null をフィルターで除外した後に strlen UDF が呼び出されることを保証するものではありません。

適切な null チェックを実行するには、次のいずれかを実行することをお勧めします。

  • UDF 自体を null 対応にし、UDF 自体の内部で null チェックを行います
  • IF式またはCASE WHEN式を使用してヌルチェックを行い、条件分岐でUDFを呼び出します
Python
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok

スカラー Python UDF のサービス資格情報

スカラー Python UDF は、 Unity Catalog サービス資格情報を使用して、外部クラウドサービスに安全にアクセスできます。 これは、クラウドベースのトークン化、暗号化、シークレット管理などの操作をデータ変換に直接統合する場合に便利です。

スカラー Python UDF のサービス資格情報は、ウェアハウスと一般的なコンピュート SQLでのみサポートされています。

サービス資格情報を作成するには、「 サービス資格情報の作成」を参照してください。

サービス認証情報にアクセスするには、UDF ロジックの databricks.service_credentials.getServiceCredentialsProvider() ユーティリティを使用して、適切な認証情報でクラウド SDK を初期化します。すべてのコードは UDF 本体にカプセル化する必要があります。

Python
@udf
def use_service_credential():
from databricks.service_credentials import getServiceCredentialsProvider
import boto3

# Assuming there is a service credential named 'testcred' set up in Unity Catalog
boto3_session = boto3.Session(botocore_session=getServiceCredentialsProvider('testcred'))
# Use the S3 session to perform operations

サービス資格情報のアクセス許可

UDF の作成者は、Unity Catalog サービス資格情報に対する ACCESS アクセス許可を持っている必要があります。

No-PE スコープで実行される UDF (専用クラスタリングとも呼ばれます) には、サービス資格情報に対する MANAGE 権限が必要です。

デフォルトの資格情報

スカラー Python UDF で使用すると、 Databricks はコンピュート 環境変数のデフォルト サービス 資格情報を自動的に使用します。 この動作により、UDF コードで資格証明エイリアスを明示的に管理しなくても、外部サービスを安全に参照できます。「コンピュート・リソースのデフォルト・サービス資格証明の指定」を参照してください

デフォルト資格情報のサポートは、標準および専用アクセスモードのクラスタリングでのみ使用できます。 DBSQLでは使用できません。

Python
@udf
def use_service_credential():
from databricks.service_credentials import getServiceCredentialsProvider
import boto3

# The default service credential for the compute is automatically used
boto3_session = boto3.Session()
# Use the S3 client to perform operations

サービス認証情報の例 - AWS Lambda 関数

次の例では、サービス認証情報を使用して、スカラー Python UDF から AWS Lambda 関数を呼び出します。次のことを行います。

  1. Databricksサービス資格情報プロバイダーを使用して、デフォルトの資格情報を取得します。
  2. boto3セッションを設定します。
  3. Lambda 関数を呼び出して、入力文字列を処理します。
Python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(StringType())
def call_lambda_udf(input_str):
import boto3
import json
import base64
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext

# Create a session using the default Unity Catalog service credential
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")

# Optionally attach Spark TaskContext metadata to the Lambda request
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

# Build the Lambda payload
payload = json.dumps({
"values": [input_str],
"is_debug": False
})

# Encode context for Lambda's client context
encoded_ctx = base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode("utf-8")

# Call the Lambda function
response = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=encoded_ctx,
Payload=payload,
)

response_payload = json.loads(response["Payload"].read().decode("utf-8"))

if "errorMessage" in response_payload:
raise Exception(response_payload["errorMessage"])

return response_payload["values"][0]

タスク実行コンテキストを取得する

TaskContext PySpark API を使用して、ユーザーの ID、クラスタータグ、spark ジョブ ID などのコンテキスト情報を取得します。 UDF でタスク コンテキストを取得するを参照してください。

制限

PySpark UDF には、次の制限が適用されます。

  • ファイルアクセス制限: Databricks Runtime 14.2 以前では、共有クラスタリング上の PySpark UDF は、Git フォルダ、ワークスペース ファイル、または Unity Catalog ボリュームにアクセスできません。

  • ブロードキャスト変数: 標準アクセス モード クラスター と サーバレス コンピュートの PySpark UDF は、ブロードキャスト変数をサポートしていません。

  • サービス資格情報: サービス資格情報は、バッチ Unity Catalog Python UDF とスカラー Python UDF でのみ使用できます。これらは、標準の Unity Catalog Python UDF ではサポートされていません。

  • サービス認証情報 : サービス認証情報は、サーバレス環境バージョン 3 以降を使用している場合にのみ、サーバレスコンピュートで使用できます。 サーバレス環境バージョンを参照してください

  • インスタンスプロファイル: 標準アクセスモードの PySpark UDF クラスターとサーバレス コンピュートはインスタンスプロファイルをサポートしていません。

  • サーバレスのメモリ制限 : サーバレス コンピュートの PySpark UDF には、 PySpark UDFあたり 1GB のメモリ制限があります。 この制限を超えると、タイプ UDF_PYSPARK_USER_CODE_ERROR のエラーが発生します 。MEMORY_LIMIT_SERVERLESS

  • 標準アクセスモードのメモリ制限 : 標準アクセスモードの PySpark UDF には、選択したインスタンスタイプの使用可能なメモリに基づいてメモリ制限があります。使用可能なメモリを超えると、タイプ UDF_PYSPARK_USER_CODE_ERROR のエラーが発生します 。MEMORY_LIMIT