Unity CatalogのバッチPythonユーザー定義関数 (UDF)
プレビュー
この機能は パブリック プレビュー段階です。
Unity CatalogのバッチPythonユーザー定義関数 (UDF)は、データのバッチを操作するPythonコードを記述できるようにすることで UDF の機能を拡張し、行ごとの UDF に関連するオーバーヘッドを削減することで効率を大幅に向上させます。これらの最適化により、Unity Catalog のバッチ Python UDF は大規模なデータ処理に最適なものとなります。
必要条件
バッチ Unity Catalog Python UDF には、 Databricks Runtime バージョン 16.3 以降が必要です。
バッチUnity Catalog Python UDFの作成
バッチ Unity Catalog Python UDF の作成は、通常の Unity Catalog UDF の作成と似ていますが、次の点が追加されています。
PARAMETER STYLE PANDAS
: これは、 UDF が Pandas イテレータを使用してバッチでデータを処理することを指定します。HANDLER 'handler_function'
: これは、バッチを処理するために呼び出されるハンドラ関数を指定します。
次の例は、バッチ Unity Catalog Python UDFを作成する方法を示しています。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
登録したら、UDF SQLまたは を使用してPython を呼び出すことができます。
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
バッチ UDF ハンドラ関数
バッチ Unity Catalog Python UDF には、バッチを処理して結果を生成するハンドラー関数が必要です。 HANDLER
キーを使用して UDF を作成するときは、ハンドラー関数の名前を指定する必要があります。
ハンドラ関数は、次の処理を行います。
- 1 つ以上の
pandas.Series
を反復処理するイテレータ引数を受け入れます。各pandas.Series
には、UDF の入力パラメーターが含まれています。 - ジェネレータを反復処理し、データを処理します。
- ジェネレータイテレータを返します。
バッチ Unity Catalog Python UDF は、入力と同じ数の行を返す必要があります。ハンドラー関数は、各バッチの入力シリーズと同じ長さの pandas.Series
を生成することで、これを保証します。
カスタム依存関係のインストール
バッチ Unity Catalog Python UDF の機能を Databricks Runtime 環境を超えて拡張するには、外部ライブラリのカスタム依存関係を定義します。
カスタム依存関係を使用した UDF の拡張を参照してください。
バッチ UDF は、1 つまたは複数のパラメーターを受け入れることができます
単一のパラメーター: ハンドラー関数が 1 つの入力パラメーターを使用する場合、各バッチの pandas.Series
を反復処理するイテレータを受け取ります。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
複数のパラメーター: 複数の入力パラメーターの場合、ハンドラー関数は、複数の pandas.Series
を反復処理するイテレータを受け取ります。 系列の値は、入力パラメーターと同じ順序です。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
コストのかかる操作を分離することでパフォーマンスを最適化
計算コストの高い操作を最適化するには、これらの操作をハンドラー関数から分離します。これにより、データのバッチに対するすべての反復ではなく、一度だけ実行されるようになります。
次の例は、負荷の高い計算が 1 回だけ実行されるようにする方法を示しています。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
バッチ Unity Catalog Python UDF のサービス資格情報
バッチ Unity Catalog Python UDF は、 Unity Catalog サービスの資格情報を使用して外部クラウド サービスにアクセスできます。 これは、セキュリティトークナイザーなどのクラウド機能をデータ処理ワークフローに統合する場合に特に便利です。
サービス資格情報を作成するには、「 サービス資格情報を使用して外部クラウド サービスへのアクセスを管理する」を参照してください。
使用するサービス資格情報を UDF 定義の CREDENTIALS
句で指定します。
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
サービス資格情報のアクセス許可
UDF 作成者は、Unity Catalog サービスの資格情報に対するACCESS
アクセス許可を持っている必要があります。ただし、UDF の呼び出し元の場合は、UDF に対する EXECUTE
権限を付与するだけで十分です。特に、UDF は UDF 作成者の認証情報権限を使用して実行されるため、UDF の呼び出し元は、基盤となるサービス認証情報にアクセスする必要はありません。
一時的な関数の場合、作成者は常に呼び出し元です。 No-PE スコープ で実行される UDF は、専用クラスタリングとも呼ばれ、代わりに呼び出し元の権限を使用します。
デフォルトの資格情報とエイリアス
CREDENTIALS
句には複数の認証情報を含めることができますが、DEFAULT
としてマークできるのは 1 つだけです。デフォルト以外の認証情報のエイリアスは、 AS
キーワードを使用して作成できます。各資格情報には、一意のエイリアスが必要です。
パッチが適用された Cloud SDK は、デフォルトの認証情報を自動的に取得します。デフォルトの資格情報 は、コンピュートの Spark 設定で指定されたデフォルトよりも優先され、 Unity Catalog UDF 定義に保持されます。
from databricks.service_credentials import getServiceCredentialsProvider
import boto3
# Assuming credential definition: CREDENTIALS(`aws-cred` AS testcred)
boto3_session = boto3.Session(botocore_session=getServiceCredentialsProvider('testcred'))
s3 = boto3_session.client('s3')
サービス認証情報の例 - AWS Lambda 関数
次の例では、サービス資格情報を使用して、バッチUnity Catalog Python UDFからAWS Lambda関数を呼び出します。
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
登録後に UDF を呼び出します。
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
タスク実行コンテキストを取得する
TaskContext PySpark API を使用して、ユーザーの ID、クラスタータグ、spark ジョブ ID などのコンテキスト情報を取得します。 UDF でタスク コンテキストを取得するを参照してください。
制限
- Python 関数は
NULL
値を個別に処理する必要があり、すべての型マッピングは Databricks SQL 言語マッピングに従う必要があります。 - バッチ Unity Catalog Python UDF は、セキュリティで保護された分離された環境で実行され、共有ファイル システムや内部サービスにはアクセスできません。
- ステージ内の複数の UDF 呼び出しはシリアル化され、中間結果がマテリアライズされ、ディスクにスピルする可能性があります。
- サービスの認証情報は、バッチ Unity Catalog Python UDF でのみ使用できます。 標準の Unity Catalog Python UDF または PySpark UDF ではサポートされていません。
- 専用クラスターおよび一時関数の場合、関数の呼び出し元は、サービスの資格情報に対する
ACCESS
アクセス許可を持っている必要があります。 サービス資格情報を使用して外部クラウド サービスにアクセスするためのアクセス許可を付与するを参照してください。