UDF でタスク コンテキストを取得する
TaskContext PySpark API を使用して、バッチ Unity Catalog Python UDF または PySpark UDFの実行中にコンテキスト情報を取得します。
たとえば、ユーザーの ID やクラスタータグなどのコンテキスト情報を使用すると、外部サービスにアクセスするためにユーザーの ID を検証できます。
必要条件
-
TaskContext は、Databricks Runtime バージョン 16.3 以降でサポートされています。
-
TaskContext は、次の UDF タイプでサポートされています。
TaskContext を使用してコンテキスト情報を取得する
タブを選択すると、 PySpark UDF または バッチ Unity Catalog Python UDF の TaskContext の例が表示されます。
- PySpark UDF
- Batch Unity Catalog Python UDF
次の PySpark UDF の例では、ユーザーのコンテキストを出力します。
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
次のバッチ Unity Catalog Python UDF 例では、サービスの資格情報を使用して AWS Lambda 関数を呼び出すためのユーザーの ID を取得します。
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
登録後に UDF を呼び出します。
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext プロパティ
TaskContext.getLocalProperty()
メソッドには、次のプロパティ キーがあります。
プロパティ キー | 説明 | 使用例 |
---|---|---|
| 現在 UDF を実行しているユーザー |
-> |
| 現在の UDF に関連付けられている Spark ジョブ グループ ID |
-> |
| JSONディクショナリの文字列表現として書式設定されたキーと値のペアとしてのメタデータタグのクラスタリング |
-> |
| ワークスペースが存在する地域 |
-> |
| 実行中のコンテキストの Databricks アカウント ID |
-> |
| ワークスペース ID (DBSQL では使用できません) |
-> |
| クラスタリングの Databricks Runtime バージョン (DBSQL 以外の環境) |
-> |
| DBSQL バージョン (DBSQL 環境上) |
-> |