Pular para o conteúdo principal

Funções de tabela definidas pelo usuário (UDTFs) do Python no Unity Catalog

info

Visualização

O registro de Python UDTFs no Unity Catalog está em visualização pública.

Uma função de tabela definida pelo usuário (UDTF) Unity Catalog registra funções que retornam tabelas completas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, as UDTFs são invocadas na cláusula FROM de uma instrução SQL e podem retornar várias linhas e colunas.

Os UDTFs são particularmente úteis para:

  • Transformando matrizes ou estruturas de dados complexas em várias linhas
  • Integração do APIs ou serviço externo ao SQL fluxo de trabalho
  • Implementação de lógica personalizada de geração ou enriquecimento de dados
  • Processamento de dados que requerem operações com estado entre as linhas

Cada chamada UDTF aceita zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.

Os UDTFs podem ser registrados de duas maneiras:

Requisitos

Unity Catalog Python Os UDTFs são compatíveis com os seguintes tipos de compute:

  • Clássico compute com modo de acesso padrão (Databricks Runtime 17.1 e acima)
  • SQL warehouse (serverless ou profissional)

Criar um UDTF no Unity Catalog

Use SQL DDL para criar um UDTF governado no Unity Catalog. Os UDTFs são invocados usando a cláusula FROM de uma instrução SQL.

SQL
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

Output
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+

A Databricks implementa UDTFs Python como classes Python com um método obrigatório eval que produz linhas de saída.

Argumentos de tabela

nota

Os argumentos TABLE são suportados no Databricks Runtime 17.2 e acima.

UDTFs podem aceitar tabelas inteiras como argumentos de entrada, permitindo transformações e agregações complexas com estado.

Métodos de ciclo de vidaeval() e terminate()

Os argumentos de tabela em UDTFs utilizam as seguintes funções para processar cada linha:

  • eval(): Chamado uma vez para cada linha na tabela de entrada. Este é o principal método de processamento e é obrigatório.
  • terminate(): Chamado uma vez no final de cada partição, depois que todas as linhas foram processadas por eval(). Use este método para produzir resultados agregados finais ou executar operações de limpeza. Este método é opcional, mas essencial para operações com estado, como agregações, contagens ou processamento de lotes.

Para obter mais informações sobre os métodos eval() e terminate() , consulte a documentaçãoApache Spark : Python UDTF.

Padrões de acesso a linhas

eval() Recebe linhas dos argumentos TABLE como objetos PySpark.sql.Row . Você pode acessar os valores pelo nome da coluna (row['id'], row['name']) ou pelo índice (row[0], row[1]).

  • Flexibilidade de esquema : declare argumentos TABLE sem definições de esquema (por exemplo, data TABLE, t TABLE). A função aceita qualquer estrutura de tabela, então seu código deve validar se as colunas necessárias existem.

Veja Exemplo: Correspondência de endereços IP com blocos de rede CIDR e Exemplo: legendagem de imagens de lotes usando o ponto de extremidade de visão Databricks.

Isolamento ambiental

nota

Os ambientes de isolamento compartilhado exigem o Databricks Runtime 17.2 e o acima. Em versões anteriores, todos os Unity Catalog Python UDTFs executavam em modo de isolamento estrito.

Unity Catalog Python UDTFs com o mesmo proprietário e sessão podem compartilhar um ambiente de isolamento por meio do site default. Isso melhora o desempenho e reduz o uso de memória ao diminuir o número de ambientes separados que precisam ser iniciados.

Isolamento estrito

Para garantir que uma UDTF seja sempre executada em seu próprio ambiente totalmente isolado, adicione a cláusula característica STRICT ISOLATION.

A maioria dos UDTFs não precisa de isolamento estrito. Os UDTFs de processamento de dados padrão se beneficiam do ambiente de isolamento compartilhado default e executam mais rapidamente com menor consumo de memória.

Adicione a cláusula característica STRICT ISOLATION às uDTFs que:

  • execução de entrada como código usando eval(), exec(), ou funções semelhantes.
  • Grave arquivos no sistema de arquivos local.
  • Modifique as variáveis globais ou o estado do sistema.
  • Acessar ou modificar a variável de ambiente.

O exemplo de UDTF a seguir define uma variável de ambiente personalizada, lê a variável de volta e multiplica um conjunto de números usando a variável. Como a UDTF altera o ambiente do processo, a execução é feita em STRICT ISOLATION. Caso contrário, poderia vazar ou substituir a variável de ambiente para outros UDFs/UDTFs no mesmo ambiente, causando um comportamento incorreto.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor

# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))

# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Defina DETERMINISTIC se sua função produzir resultados consistentes

Adicione DETERMINISTIC à sua definição de função se ela produzir as mesmas saídas para as mesmas entradas. Isso permite otimizações de consulta para melhorar o desempenho.

Por default, os lotes Unity Catalog Python UDTFs são considerados não determinísticos, a menos que sejam declarados explicitamente. Exemplos de funções não determinísticas incluem: gerar valores aleatórios, acessar horas ou datas atuais ou fazer chamadas de API externas.

Veja CREATE FUNCTION (SQL e Python).

Exemplos práticos

Os exemplos a seguir demonstram casos de uso do mundo real para UDTFs do Unity Catalog Python, progredindo de transformações de dados simples para integrações externas complexas.

Exemplo: Reimplementação explode

Embora o Spark forneça uma função explode integrada, a criação de sua própria versão demonstra o padrão UDTF fundamental de receber uma única entrada e produzir várias linhas de saída.

SQL
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;

Use a função diretamente em uma consulta SQL:

SQL
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
Output
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+

Ou aplique-o aos dados da tabela existente com uma joinLATERAL:

SQL
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Exemplo: Geolocalização de endereço IP via API REST

Este exemplo demonstra como UDTFs podem integrar APIs externas diretamente no seu fluxo de trabalho SQL. O analista pode incorporar dados com chamadas de API em tempo real usando sintaxe SQL familiar, sem exigir processos ETL separados.

SQL
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
nota

Python Os UDTFs permitem o tráfego de rede TCP/UDP pelas portas 80, 443 e 53 ao usar serverless compute ou compute configurado com o modo de acesso padrão.

Use a função para enriquecer os dados da Web log com informações geográficas:

SQL
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Essa abordagem permite a análise geográfica em tempo real sem a necessidade de tabelas de pesquisa pré-processadas ou um pipeline de dados separado. O UDTF lida com solicitações HTTP, análise de JSON e tratamento de erros, tornando as fontes de dados externas acessíveis por meio de consultas padrão em SQL.

Exemplo: comparar endereços IP com blocos de rede CIDR

Este exemplo demonstra a correspondência de endereços IP com blocos de rede CIDR, uma tarefa comum de engenharia de dados que requer lógica SQL complexa.

Primeiro, crie dados de amostra com endereços IPv4 e IPv6:

SQL
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Em seguida, defina e registre o UDTF. Observe a estrutura de classes do Python:

  • O parâmetro t TABLE aceita uma tabela de entrada com qualquer esquema. A UDTF se adapta automaticamente para processar quaisquer colunas fornecidas. Essa flexibilidade significa que você pode usar a mesma função em diferentes tabelas sem modificar a assinatura da função. No entanto, você deve verificar cuidadosamente o esquema das linhas para garantir a compatibilidade.
  • O método __init__ é usado para configurações pesadas e únicas, como carregar uma grande lista de rede. Este processo é realizado uma vez por partição da tabela de entrada.
  • O método eval processa cada linha e contém a lógica principal de correspondência. Este método é executado exatamente uma vez para cada linha na partição de entrada, e cada execução é realizada pela instância correspondente da classe IpMatcher UDTF para essa partição.
  • A cláusula HANDLER especifica o nome da classe Python que implementa a lógica UDTF.
SQL
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))

def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Agora que ip_cidr_matcher está registrado no Unity Catalog, chame-o diretamente do SQL usando a sintaxe TABLE() :

SQL
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
Output
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+

Exemplo: legendas de imagens de lotes usando o endpoint de visão Databricks

Este exemplo demonstra a legendagem de lotes de imagens usando um endpoint do modelo de visão Databricks . Ele demonstra o uso de terminate() para processamento de lotes e execução baseada em partição.

  1. Crie uma tabela com URLs de imagens públicas:

    SQL
    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
  2. Crie uma UDTF ( Python Unity Catalog para gerar legendas de imagens:

    1. Inicialize a UDTF com a configuração, incluindo o tamanho dos lotes, os tokens API Databricks , endpoint do modelo de visão e a URL workspace .
    2. No método eval , colete os URLs das imagens em um buffer. Quando o buffer atingir o tamanho dos lotes, inicie o processamento dos lotes. Isso garante que várias imagens sejam processadas juntas em uma única chamada de API, em vez de chamadas individuais para cada imagem.
    3. No método de processamento em lotes, download todas as imagens armazenadas em buffer, codifique-as em base64 e envie-as em uma única solicitação API para o Databricks VisionModel. O modelo processa todas as imagens simultaneamente e retorna legendas para todos os lotes.
    4. O método terminate é executado exatamente uma vez no final de cada partição. No método de término, processe todas as imagens restantes no buffer e retorne todas as legendas coletadas como resultado.
nota

Substitua <workspace-url> pelo URL real do seu workspace Databricks (https://your-workspace.cloud.databricks.com).

SQL
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-3-7-sonnet"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []

def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()

def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)

def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()

import base64
import httpx
import requests

# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3

# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None

for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)

content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})

payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}

response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)

result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()

lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]

while len(captions) < len(batch_data):
captions.append(batch_response)

self.results.extend(captions[:len(batch_data)])
$$;

Para usar a UDTF de legenda de imagem de lotes, chame-a usando a tabela de imagens de exemplo:

nota

Substitua your_secret_scope e api_token pelo Escopo Secreto e nome key reais dos tokens API Databricks .

SQL
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
Output
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

Você também pode gerar legendas de imagens categoria por categoria:

SQL
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
Output
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+

Exemplo: Cálculo da curva ROC e da AUC para avaliação de modelos de ML

Este exemplo demonstra o cálculo de curvas ROC (Receiver Operating Characteristic) e da área sob a curva (AUC) para avaliação de modelos de classificação binária usando o scikit-learn.

Este exemplo demonstra vários padrões importantes:

  • Utilização de biblioteca externa : Integra o scikit-learn para o cálculo da curva ROC.
  • Agregação com estado : acumula previsões em todas as linhas antes de calcular as métricas.
  • Uso do métodoterminate() : Processa o dataset completo e retorna resultados somente após todas as linhas terem sido avaliadas.
  • Tratamento de erros : Valida se as colunas necessárias existem na tabela de entrada.

O UDTF acumula todas as previsões na memória usando o método eval() , depois calcula e produz a curva ROC completa no método terminate() . Esse padrão é útil para métricas que exigem o dataset completo para o cálculo.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score

self._true_labels = []
self._predicted_scores = []

def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")

true_label = row['y_true']
predicted_score = row['y_score']

label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))

def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)

auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Crie dados de classificação binária de exemplo com previsões:

SQL
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Calcule a curva ROC e a AUC:

SQL
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
Output
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+

Limitações

As seguintes limitações se aplicam aos UDTFs do Unity Catalog Python:

Próximos passos