lotes Python Funções definidas pelo usuário (UDFs) em Unity Catalog
Visualização
Esse recurso está em Public Preview.
lotes Unity Catalog Python As UDFs ampliam os recursos das UDFs Unity Catalog, permitindo que o senhor escreva o código Python para operar em lotes de dados, melhorando significativamente a eficiência ao reduzir a sobrecarga associada às UDFs linha a linha. Essas otimizações tornam os UDFs do Unity Catalog lotes Python ideais para o processamento de dados de grandes escalas.
Requisitos
lotes Unity Catalog Python UDFs exigem Databricks Runtime versões 16.3 e acima.
Criar lotes Unity Catalog Python UDF
A criação de um lote Unity Catalog Python UDF é semelhante à criação de um lote normal Unity Catalog UDF, com as seguintes adições:
PARAMETER STYLE PANDAS
: Isso especifica que o site UDF processa dados em lotes usando iteradores Pandas.HANDLER 'handler_function'
: Especifica a função do manipulador que é chamada para processar os lotes.
O exemplo a seguir mostra como criar um lote Unity Catalog Python UDF:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
Após registrar o UDF, o senhor pode chamá-lo usando SQL ou Python:
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
lotes UDF handler function
lotes Unity Catalog Python Os UDFs exigem uma função de tratamento que processe os lotes e produza resultados. O senhor deve especificar o nome da função do manipulador ao criar o UDF usando o HANDLER
key.
A função de manipulador faz o seguinte:
- Aceita um argumento iterador que itera sobre um ou mais
pandas.Series
. Cadapandas.Series
contém os parâmetros de entrada do UDF. - Itera sobre o gerador e processa os dados.
- Retorna um iterador gerador.
lotes Unity Catalog Python Os UDFs devem retornar o mesmo número de linhas que a entrada. A função handler garante isso produzindo um pandas.Series
com o mesmo comprimento da série de entrada para cada lote.
Instale dependências personalizadas
O senhor pode estender a funcionalidade dos lotes Unity Catalog Python UDFs para além do ambiente Databricks Runtime, definindo dependências personalizadas para bibliotecas externas.
Consulte Estender UDFs usando dependências personalizadas.
lotes UDFs podem aceitar parâmetros únicos ou múltiplos
Parâmetro único: Quando a função de manipulador usa um único parâmetro de entrada, ela recebe um iterador que itera sobre um pandas.Series
para cada lote.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
Vários parâmetros: para vários parâmetros de entrada, a função manipuladora recebe um iterador que itera em vários pandas.Series
. Os valores na série estão na mesma ordem dos parâmetros de entrada.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
Otimizar o desempenho separando operações caras
O senhor pode otimizar as operações computacionalmente caras separando-as da função de tratamento. Isso garante que eles sejam executados apenas uma vez, e não durante cada iteração em lotes de dados.
O exemplo a seguir mostra como garantir que uma computação cara seja executada somente uma vez:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
credenciais de serviço em lotes Unity Catalog Python UDFs
Os lotes Unity Catalog Python UDFs podem usar as Credenciais de serviço Unity Catalog para acessar o serviço de nuvem externo. Isso é particularmente útil para integrar funções de nuvem, como tokenizadores de segurança, ao fluxo de trabalho de processamento de dados.
Para criar uma credencial de serviço, consulte Gerenciar o acesso ao serviço de nuvem externo usando credenciais de serviço.
Especifique a credencial de serviço que o senhor deseja usar na cláusula CREDENTIALS
na definição do UDF:
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
Permissões de credenciais de serviço
O criador do UDF deve ter permissão ACCESS
na credencial de serviço do Unity Catalog. No entanto, para os chamadores de UDF, basta conceder-lhes a permissão EXECUTE
no UDF. Em particular, os chamadores de UDF não precisam acessar a credencial do serviço subjacente, porque o UDF é executado usando as permissões de credencial do criador do UDF.
Para funções temporárias, o criador é sempre o invocador. Os UDFs que são executados no escopo No-PE , também conhecidos como clustering dedicado, usam as permissões do chamador.
credenciais e aliases padrão
Você pode incluir várias credenciais na cláusula CREDENTIALS
, mas somente uma pode ser marcada como DEFAULT
. O senhor pode criar um alias para credenciais que não sejamdefault usando a palavra-chave AS
. Cada credencial deve ter um alias exclusivo.
Os SDKs de nuvem corrigidos captam automaticamente as credenciais do default. A default credencial tem precedência sobre qualquer default especificada na compute Spark configuração do e persiste na Unity Catalog UDF definição .
%sql
CREATE OR REPLACE TEMPORARY FUNCTION call_lambda_func(data STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import google.auth # this import is always needed to trigger the SDK monkeypatching
from google.cloud import storage
# the client automatically uses the default credential 'batch-udf-service-creds-example-cred'
client = storage.Client(project='your-project')
# you can also get the token explicitly from the default provider
from google.auth import default
credentials, _ = default()
token = credentials.token
def batchhandler(it):
# use client
buckets = client.Buckets()
for vals in data:
yield vals
$$
Exemplo de credencial de serviço - função AWS Lambda
O exemplo a seguir usa uma credencial de serviço para chamar uma função AWS Lambda de um lote Unity Catalog Python UDF:
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
Chame o UDF depois que ele for registrado:
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
Obter o contexto de execução da tarefa
Use o TaskContext PySpark API para obter informações de contexto, como a identidade do usuário, a tag do cluster, o ID do spark job e muito mais. Consulte Obter contexto de tarefa em um UDF.
Limitações
- Python devem tratar os valores de
NULL
de forma independente, e todos os mapeamentos de tipos devem seguir os mapeamentos de linguagem de Databricks SQL. - Os lotes Unity Catalog Python UDFs são executados em um ambiente seguro e isolado e não têm acesso a um sistema de arquivos compartilhado ou a um serviço interno.
- Várias invocações do UDF em um estágio são serializadas e os resultados intermediários são materializados e podem ser transferidos para o disco.
- As credenciais de serviço estão disponíveis somente em lotes Unity Catalog Python UDFs. Elas não são compatíveis com as UDFs padrão do Python do Unity Catalog ou com as UDFs do PySpark.
- Em clustering dedicado e para funções temporárias, o chamador da função deve ter permissões
ACCESS
nas credenciais do serviço. Consulte Conceder permissões para usar uma credencial de serviço para acessar um serviço de nuvem externo.