Obter contexto de tarefa em um UDF
Use o TaskContext PySpark API para obter informações de contexto durante a execução de um lote Unity Catalog Python UDF ou PySpark UDF.
Por exemplo, informações de contexto, como a identidade do usuário e a Cluster Tag, podem verificar a identidade de um usuário para acessar um serviço externo.
Requisitos
-
O TaskContext é compatível com as versões 16.3 e superiores do site Databricks Runtime.
-
TaskContext é compatível com os seguintes tipos de UDF:
Usar TaskContext para obter informações de contexto
Selecione um tab para ver exemplos de TaskContext para PySpark UDFs ou lotes Unity Catalog Python UDFs .
- PySpark UDF
- Batch Unity Catalog Python UDF
O exemplo de UDF do PySpark a seguir imprime o contexto do usuário:
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
O exemplo de lotes Unity Catalog Python UDF a seguir obtém a identidade do usuário para chamar uma função AWS Lambda usando uma credencial de serviço:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Chame o UDF depois que ele for registrado:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Propriedades do TaskContext
O método TaskContext.getLocalProperty()
tem a seguinte chave de propriedade:
Chave de propriedade | Descrição | Exemplo de uso |
---|---|---|
| O usuário que está executando o UDF no momento |
- > |
| O Spark ID do grupo de trabalho associado ao atual UDF |
- > |
| Agrupamento de tags de metadados como par key-valor formatado como uma representação de strings de um dicionário JSON |
- > |
| A região onde o workspace reside |
- > |
| Databricks account ID do contexto de execução |
- > |
| ID do espaço de trabalho (não disponível no DBSQL) |
- > |
| Databricks Runtime versão para o clustering (em ambientes não-DBSQL) |
- > |
| Versão DBSQL (em ambientes DBSQL) |
- > |