Pular para o conteúdo principal

Funções de tabela definidas pelo usuário (UDTFs) do Python no Unity Catalog

info

Visualização

O registro de Python UDTFs no Unity Catalog está em visualização pública.

Uma função de tabela definida pelo usuário (UDTF) Unity Catalog registra funções que retornam tabelas completas em vez de valores escalares. Ao contrário das funções escalares que retornam um único valor de resultado de cada chamada, as UDTFs são invocadas na cláusula FROM de uma instrução SQL e podem retornar várias linhas e colunas.

Os UDTFs são particularmente úteis para:

  • Transformando matrizes ou estruturas de dados complexas em várias linhas
  • Integração do APIs ou serviço externo ao SQL fluxo de trabalho
  • Implementação de lógica personalizada de geração ou enriquecimento de dados
  • Processamento de dados que requerem operações com estado entre as linhas

Cada chamada UDTF aceita zero ou mais argumentos. Esses argumentos podem ser expressões escalares ou argumentos de tabela que representam tabelas de entrada inteiras.

Os UDTFs podem ser registrados de duas maneiras:

Requisitos

Unity Catalog Python Os UDTFs são compatíveis com os seguintes tipos de compute:

  • Notebook e Job sem servidor
  • Clássico compute com modo de acesso padrão (Databricks Runtime 17.1 e acima)
  • SQL warehouse (serverless ou profissional)

Criar um UDTF no Unity Catalog

Use SQL DDL para criar um UDTF governado no Unity Catalog. Os UDTFs são invocados usando a cláusula FROM de uma instrução SQL.

SQL
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

Output
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+

A Databricks implementa UDTFs Python como classes Python com um método obrigatório eval que produz linhas de saída.

Argumentos de tabela

nota

Os argumentos TABLE são suportados no Databricks Runtime 17.2 e acima.

UDTFs podem aceitar tabelas inteiras como argumentos de entrada, permitindo transformações e agregações complexas com estado.

Métodos de ciclo de vidaeval() e terminate()

Os argumentos de tabela em UDTFs utilizam as seguintes funções para processar cada linha:

  • eval(): Chamado uma vez para cada linha na tabela de entrada. Este é o principal método de processamento e é obrigatório.
  • terminate(): Chamado uma vez no final de cada partição, depois que todas as linhas foram processadas por eval(). Use este método para produzir resultados agregados finais ou executar operações de limpeza. Este método é opcional, mas essencial para operações com estado, como agregações, contagens ou processamento de lotes.

Para obter mais informações sobre os métodos eval() e terminate() , consulte a documentaçãoApache Spark : Python UDTF.

Padrões de acesso a linhas

eval() Recebe linhas dos argumentos TABLE como objetos PySpark.sql.Row . Você pode acessar os valores pelo nome da coluna (row['id'], row['name']) ou pelo índice (row[0], row[1]).

  • Flexibilidade de esquema : declare argumentos TABLE sem definições de esquema (por exemplo, data TABLE, t TABLE). A função aceita qualquer estrutura de tabela, então seu código deve validar se as colunas necessárias existem.

Veja Exemplo: Correspondência de endereços IP com blocos de rede CIDR e Exemplo: legendagem de imagens de lotes usando o ponto de extremidade de visão Databricks.

Calcular um esquema de saída dinâmico (UDTFs polimórficos)

nota

As UDTFs UC polimórficas requerem Databricks Runtime 18.1 ou superior.

Uma UDTF polimórfica determina seu esquema de saída dinamicamente no momento da consulta usando um método estático analyze() , em vez de declarar colunas antecipadamente. Para criar um, use RETURNS TABLE sem definições de coluna e defina um método analyze() na classe manipuladora.

O exemplo a seguir extrai campos especificados pelo chamador de uma string JSON , retornando colunas diferentes dependendo do argumento fields :

SQL
CREATE OR REPLACE FUNCTION extract_fields(json_str STRING, fields STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ExtractFields'
AS $$
class ExtractFields:
@staticmethod
def analyze(json_str, fields):

# Build the output schema from the requested field names
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult
col_names = [f.strip() for f in fields.value.split(",")]
return AnalyzeResult(
StructType([StructField(name, StringType()) for name in col_names])
)

def eval(self, json_str: str, fields: str):
# Parse the JSON and yield only the requested fields
import json
data = json.loads(json_str)
col_names = [f.strip() for f in fields.split(",")]
yield tuple(data.get(name) for name in col_names)
$$;

-- Extract the name and city
SELECT * FROM extract_fields(
'{"name": "Alice", "age": 30, "city": "Seattle"}',
'name, city'
);
Output
+-------+---------+
| name | city |
+-------+---------+
| Alice | Seattle |
+-------+---------+

Defina o método analyze

A classe manipuladora deve incluir um método @staticmethod chamado analyze que aceita os mesmos argumentos que o UDTF e retorna um AnalyzeResult descrevendo o esquema de saída. O Databricks chama analyze() no momento do planejamento da consulta para resolver o esquema antes de executar a função.

Cada parâmetro de analyze é uma instância da classe AnalyzeArgument :

campo

Descrição

dataType

O tipo do argumento de entrada é DataType. Para argumentos de tabela de entrada, isso é um StructType representando as colunas da tabela.

value

O valor do argumento de entrada é Optional[Any]. Isto é None para argumentos de tabela ou expressões não constantes.

isTable

Se o argumento de entrada for um argumento de tabela como BooleanType.

isConstantExpression

Se o argumento de entrada é uma expressão que pode ser dobrada como um BooleanType.

O método analyze retorna uma instância da classe AnalyzeResult :

campo

Descrição

schema

O esquema da tabela de resultados como um StructType.

withSinglePartition

Se True, envia todas as linhas de entrada para a mesma instância da classe UDTF.

partitionBy

Se não estiver vazio, divide as linhas de entrada pelas expressões especificadas, de forma que cada combinação única seja processada por uma instância UDTF separada.

orderBy

Se não estiver vazio, especifica a ordem das linhas dentro de cada partição.

select

Se não estiver vazio, especifica quais colunas do argumento TABLE de entrada a UDTF receberá.

atenção

Para UDTFs polimórficas Unity Catalog , você deve colocar todas as importações dentro do corpo do método analyze() . As importações de nível superior não estão disponíveis no ambiente sandbox Unity Catalog .

Estado direto de analyze para eval

O método analyze é executado uma vez no momento do planejamento da consulta, então você pode usá-lo para pré-processar argumentos constantes, analisar configurações ou construir pesquisas. Para encaminhar esses resultados para eval, crie uma subclasse @dataclass de AnalyzeResult com campos personalizados, retorne-a de analyze e aceite-a no método __init__ . Isso evita repetir um trabalho dispendioso para cada linha.

O exemplo a seguir resolve um código de idioma para um nome de idioma completo uma vez em analyze e o encaminha, para que eval possa tag cada linha sem repetir a pesquisa:

SQL
CREATE OR REPLACE FUNCTION tag_language(t TABLE, lang_code STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'TagLanguage'
AS $$
class TagLanguage:
@staticmethod
def analyze(t, lang_code):
from dataclasses import dataclass
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.udtf import AnalyzeResult

@dataclass
class LangResult(AnalyzeResult):
language: str = ""

# Resolve the language code to a full name once during planning
languages = {"en": "English", "es": "Spanish", "fr": "French", "de": "German"}
return LangResult(
schema=StructType([
StructField("text", StringType()),
StructField("language", StringType())
]),
language=languages.get(lang_code.value, "Unknown")
)

def __init__(self, result):
self._language = result.language

def eval(self, row, lang_code: str):
# Tag each row with the pre-resolved language name
yield (row['text'], self._language)
$$;

SELECT * FROM tag_language(
TABLE(VALUES ('Hola mundo'), ('Buenos días') t(text)),
'es'
);
Output
+-------------+----------+
| text | language |
+-------------+----------+
| Hola mundo | Spanish |
| Buenos días | Spanish |
+-------------+----------+

Para mais padrões e detalhes sobre encaminhamento de estado, consulte Encaminhar estado para chamadas futuras eval.

Especifique o particionamento a partir do método analyze

Quando uma UDTF polimórfica aceita um argumento de tabela, o método analyze pode controlar como as linhas de entrada são distribuídas entre as instâncias da UDTF definindo partitionBy, orderBy, withSinglePartition e select no AnalyzeResult. Isso elimina a necessidade de os chamadores especificarem PARTITION BY ou ORDER BY em SQL.

Para obter a API de particionamento completa e exemplos, consulte Especificar um particionamento das linhas de entrada do método analyze.

Isolamento ambiental

nota

Os ambientes de isolamento compartilhado exigem o Databricks Runtime 17.2 e o acima. Em versões anteriores, todos os Unity Catalog Python UDTFs executavam em modo de isolamento estrito.

Unity Catalog Python UDTFs com o mesmo proprietário e sessão podem compartilhar um ambiente de isolamento por meio do site default. Isso melhora o desempenho e reduz o uso de memória ao diminuir o número de ambientes separados que precisam ser iniciados.

Isolamento estrito

Para garantir que uma UDTF seja sempre executada em seu próprio ambiente totalmente isolado, adicione a cláusula característica STRICT ISOLATION.

A maioria dos UDTFs não precisa de isolamento estrito. Os UDTFs de processamento de dados padrão se beneficiam do ambiente de isolamento compartilhado default e executam mais rapidamente com menor consumo de memória.

Adicione a cláusula característica STRICT ISOLATION às uDTFs que:

  • execução de entrada como código usando eval(), exec(), ou funções semelhantes.
  • Grave arquivos no sistema de arquivos local.
  • Modifique as variáveis globais ou o estado do sistema.
  • Acessar ou modificar a variável de ambiente.

O exemplo de UDTF a seguir define uma variável de ambiente personalizada, lê a variável de volta e multiplica um conjunto de números usando a variável. Como a UDTF altera o ambiente do processo, a execução é feita em STRICT ISOLATION. Caso contrário, poderia vazar ou substituir a variável de ambiente para outros UDFs/UDTFs no mesmo ambiente, causando um comportamento incorreto.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor

# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))

# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

Defina DETERMINISTIC se sua função produzir resultados consistentes

Adicione DETERMINISTIC à sua definição de função se ela produzir as mesmas saídas para as mesmas entradas. Isso permite otimizações de consulta para melhorar o desempenho.

Por default, os lotes Unity Catalog Python UDTFs são considerados não determinísticos, a menos que sejam declarados explicitamente. Exemplos de funções não determinísticas incluem: gerar valores aleatórios, acessar horas ou datas atuais ou fazer chamadas de API externas.

Veja CREATE FUNCTION (SQL e Python).

Exemplos práticos

Os exemplos a seguir demonstram casos de uso do mundo real para UDTFs do Unity Catalog Python, progredindo de transformações de dados simples para integrações externas complexas.

Exemplo: Reimplementação explode

Embora o Spark forneça uma função explode integrada, a criação de sua própria versão demonstra o padrão UDTF fundamental de receber uma única entrada e produzir várias linhas de saída.

SQL
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;

Use a função diretamente em uma consulta SQL:

SQL
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
Output
+---------+
| element |
+---------+
| apple |
| banana |
| cherry |
+---------+

Ou aplique-o aos dados da tabela existente com uma joinLATERAL:

SQL
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

Exemplo: Geolocalização de endereço IP via API REST

Este exemplo demonstra como UDTFs podem integrar APIs externas diretamente no seu fluxo de trabalho SQL. O analista pode incorporar dados com chamadas de API em tempo real usando sintaxe SQL familiar, sem exigir processos ETL separados.

SQL
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
nota

Python Os UDTFs permitem o tráfego de rede TCP/UDP pelas portas 80, 443 e 53 ao usar serverless compute ou compute configurado com o modo de acesso padrão.

Use a função para enriquecer os dados da Web log com informações geográficas:

SQL
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

Essa abordagem permite a análise geográfica em tempo real sem a necessidade de tabelas de pesquisa pré-processadas ou um pipeline de dados separado. O UDTF lida com solicitações HTTP, análise de JSON e tratamento de erros, tornando as fontes de dados externas acessíveis por meio de consultas padrão em SQL.

Exemplo: comparar endereços IP com blocos de rede CIDR

Este exemplo demonstra a correspondência de endereços IP com blocos de rede CIDR, uma tarefa comum de engenharia de dados que requer lógica SQL complexa.

Primeiro, crie dados de amostra com endereços IPv4 e IPv6:

SQL
-- An example IP logs with both IPv4 and IPv6 addresses
CREATE OR REPLACE TEMPORARY VIEW ip_logs AS
VALUES
('log1', '192.168.1.100'),
('log2', '10.0.0.5'),
('log3', '172.16.0.10'),
('log4', '8.8.8.8'),
('log5', '2001:db8::1'),
('log6', '2001:db8:85a3::8a2e:370:7334'),
('log7', 'fe80::1'),
('log8', '::1'),
('log9', '2001:db8:1234:5678::1')
t(log_id, ip_address);

Em seguida, defina e registre o UDTF. Observe a estrutura de classes do Python:

  • O parâmetro t TABLE aceita uma tabela de entrada com qualquer esquema. A UDTF se adapta automaticamente para processar quaisquer colunas fornecidas. Essa flexibilidade significa que você pode usar a mesma função em diferentes tabelas sem modificar a assinatura da função. No entanto, você deve verificar cuidadosamente o esquema das linhas para garantir a compatibilidade.
  • O método __init__ é usado para configurações pesadas e únicas, como carregar uma grande lista de rede. Este processo é realizado uma vez por partição da tabela de entrada.
  • O método eval processa cada linha e contém a lógica principal de correspondência. Este método é executado exatamente uma vez para cada linha na partição de entrada, e cada execução é realizada pela instância correspondente da classe IpMatcher UDTF para essa partição.
  • A cláusula HANDLER especifica o nome da classe Python que implementa a lógica UDTF.
SQL
CREATE OR REPLACE TEMPORARY FUNCTION ip_cidr_matcher(t TABLE)
RETURNS TABLE(log_id STRING, ip_address STRING, network STRING, ip_version INT)
LANGUAGE PYTHON
HANDLER 'IpMatcher'
COMMENT 'Match IP addresses against a list of network CIDR blocks'
AS $$
class IpMatcher:
def __init__(self):
import ipaddress
# Heavy initialization - load networks once per partition
self.nets = []
cidrs = ['192.168.0.0/16', '10.0.0.0/8', '172.16.0.0/12',
'2001:db8::/32', 'fe80::/10', '::1/128']
for cidr in cidrs:
self.nets.append(ipaddress.ip_network(cidr))

def eval(self, row):
import ipaddress
# Validate that required fields exist
required_fields = ['log_id', 'ip_address']
for field in required_fields:
if field not in row:
raise ValueError(f"Missing required field: {field}")
try:
ip = ipaddress.ip_address(row['ip_address'])
for net in self.nets:
if ip in net:
yield (row['log_id'], row['ip_address'], str(net), ip.version)
return
yield (row['log_id'], row['ip_address'], None, ip.version)
except ValueError:
yield (row['log_id'], row['ip_address'], 'Invalid', None)
$$;

Agora que ip_cidr_matcher está registrado no Unity Catalog, chame-o diretamente do SQL usando a sintaxe TABLE() :

SQL
-- Process all IP addresses
SELECT
*
FROM
ip_cidr_matcher(t => TABLE(ip_logs))
ORDER BY
log_id;
Output
+--------+-------------------------------+-----------------+-------------+
| log_id | ip_address | network | ip_version |
+--------+-------------------------------+-----------------+-------------+
| log1 | 192.168.1.100 | 192.168.0.0/16 | 4 |
| log2 | 10.0.0.5 | 10.0.0.0/8 | 4 |
| log3 | 172.16.0.10 | 172.16.0.0/12 | 4 |
| log4 | 8.8.8.8 | null | 4 |
| log5 | 2001:db8::1 | 2001:db8::/32 | 6 |
| log6 | 2001:db8:85a3::8a2e:370:7334 | 2001:db8::/32 | 6 |
| log7 | fe80::1 | fe80::/10 | 6 |
| log8 | ::1 | ::1/128 | 6 |
| log9 | 2001:db8:1234:5678::1 | 2001:db8::/32 | 6 |
+--------+-------------------------------+-----------------+-------------+

Exemplo: legendas de imagens de lotes usando o endpoint de visão Databricks

Este exemplo demonstra a legendagem de lotes de imagens usando um endpoint do modelo de visão Databricks . Ele demonstra o uso de terminate() para processamento de lotes e execução baseada em partição.

  1. Crie uma tabela com URLs de imagens públicas:

    SQL
    CREATE OR REPLACE TEMPORARY VIEW sample_images AS
    VALUES
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg', 'scenery'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/a/a7/Camponotus_flavomarginatus_ant.jpg/1024px-Camponotus_flavomarginatus_ant.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/1/15/Cat_August_2010-4.jpg/1200px-Cat_August_2010-4.jpg', 'animals'),
    ('https://upload.wikimedia.org/wikipedia/commons/thumb/c/c5/M101_hires_STScI-PRC2006-10a.jpg/1024px-M101_hires_STScI-PRC2006-10a.jpg', 'scenery')
    images(image_url, category);
  2. Crie uma UDTF ( Python Unity Catalog para gerar legendas de imagens:

    1. Inicialize a UDTF com a configuração, incluindo o tamanho dos lotes, os tokens API Databricks , endpoint do modelo de visão e a URL workspace .
    2. No método eval , colete os URLs das imagens em um buffer. Quando o buffer atingir o tamanho dos lotes, inicie o processamento dos lotes. Isso garante que várias imagens sejam processadas juntas em uma única chamada de API, em vez de chamadas individuais para cada imagem.
    3. No método de processamento em lotes, download todas as imagens armazenadas em buffer, codifique-as em base64 e envie-as em uma única solicitação API para o Databricks VisionModel. O modelo processa todas as imagens simultaneamente e retorna legendas para todos os lotes.
    4. O método terminate é executado exatamente uma vez no final de cada partição. No método de término, processe todas as imagens restantes no buffer e retorne todas as legendas coletadas como resultado.
nota

Substitua <workspace-url> pelo URL real do seu workspace Databricks (https://your-workspace.cloud.databricks.com).

SQL
CREATE OR REPLACE TEMPORARY FUNCTION batch_inference_image_caption(data TABLE, api_token STRING)
RETURNS TABLE (caption STRING)
LANGUAGE PYTHON
HANDLER 'BatchInferenceImageCaption'
COMMENT 'batch image captioning by sending groups of image URLs to a Databricks vision endpoint and returning concise captions for each image.'
AS $$
class BatchInferenceImageCaption:
def __init__(self):
self.batch_size = 3
self.vision_endpoint = "databricks-claude-sonnet-4-5"
self.workspace_url = "<workspace-url>"
self.image_buffer = []
self.results = []

def eval(self, row, api_token):
self.image_buffer.append((str(row[0]), api_token))
if len(self.image_buffer) >= self.batch_size:
self._process_batch()

def terminate(self):
if self.image_buffer:
self._process_batch()
for caption in self.results:
yield (caption,)

def _process_batch(self):
batch_data = self.image_buffer.copy()
self.image_buffer.clear()

import base64
import httpx
import requests

# API request timeout in seconds
api_timeout = 60
# Maximum tokens for vision model response
max_response_tokens = 300
# Temperature controls randomness (lower = more deterministic)
model_temperature = 0.3

# create a batch for the images
batch_images = []
api_token = batch_data[0][1] if batch_data else None

for image_url, _ in batch_data:
image_response = httpx.get(image_url, timeout=15)
image_data = base64.standard_b64encode(image_response.content).decode("utf-8")
batch_images.append(image_data)

content_items = [{
"type": "text",
"text": "Provide brief captions for these images, one per line."
}]
for img_data in batch_images:
content_items.append({
"type": "image_url",
"image_url": {
"url": "data:image/jpeg;base64," + img_data
}
})

payload = {
"messages": [{
"role": "user",
"content": content_items
}],
"max_tokens": max_response_tokens,
"temperature": model_temperature
}

response = requests.post(
self.workspace_url + "/serving-endpoints/" +
self.vision_endpoint + "/invocations",
headers={
'Authorization': 'Bearer ' + api_token,
'Content-Type': 'application/json'
},
json=payload,
timeout=api_timeout
)

result = response.json()
batch_response = result['choices'][0]['message']['content'].strip()

lines = batch_response.split('\n')
captions = [line.strip() for line in lines if line.strip()]

while len(captions) < len(batch_data):
captions.append(batch_response)

self.results.extend(captions[:len(batch_data)])
$$;

Para usar a UDTF de legenda de imagem de lotes, chame-a usando a tabela de imagens de exemplo:

nota

Substitua your_secret_scope e api_token pelo Escopo Secreto e nome key reais dos tokens API Databricks .

SQL
SELECT
caption
FROM
batch_inference_image_caption(
data => TABLE(sample_images),
api_token => secret('your_secret_scope', 'api_token')
)
Output
+---------------------------------------------------------------------------------------------------------------+
| caption |
+---------------------------------------------------------------------------------------------------------------+
| Wooden boardwalk cutting through vibrant wetland grasses under blue skies |
| Black ant in detailed macro photography standing on a textured surface |
| Tabby cat lounging comfortably on a white ledge against a white wall |
| Stunning spiral galaxy with bright central core and sweeping blue-white arms against the black void of space. |
+---------------------------------------------------------------------------------------------------------------+

Você também pode gerar legendas de imagens categoria por categoria:

SQL
SELECT
*
FROM
batch_inference_image_caption(
TABLE(sample_images)
PARTITION BY category ORDER BY (category),
secret('your_secret_scope', 'api_token')
)
Output
+------------------------------------------------------------------------------------------------------+
| caption |
+------------------------------------------------------------------------------------------------------+
| Black ant in detailed macro photography standing on a textured surface |
| Stunning spiral galaxy with bright center and sweeping blue-tinged arms against the black of space. |
| Tabby cat lounging comfortably on white ledge against white wall |
| Wooden boardwalk cutting through lush wetland grasses under blue skies |
+------------------------------------------------------------------------------------------------------+

Exemplo: Cálculo da curva ROC e da AUC para avaliação de modelos de ML

Este exemplo demonstra o cálculo de curvas ROC (Receiver Operating Characteristic) e da área sob a curva (AUC) para avaliação de modelos de classificação binária usando o scikit-learn.

Este exemplo demonstra vários padrões importantes:

  • Utilização de biblioteca externa : Integra o scikit-learn para o cálculo da curva ROC.
  • Agregação com estado : acumula previsões em todas as linhas antes de calcular as métricas.
  • Uso do métodoterminate() : Processa o dataset completo e retorna resultados somente após todas as linhas terem sido avaliadas.
  • Tratamento de erros : Valida se as colunas necessárias existem na tabela de entrada.

O UDTF acumula todas as previsões na memória usando o método eval() , depois calcula e produz a curva ROC completa no método terminate() . Esse padrão é útil para métricas que exigem o dataset completo para o cálculo.

SQL
CREATE OR REPLACE TEMPORARY FUNCTION compute_roc_curve(t TABLE)
RETURNS TABLE (threshold DOUBLE, true_positive_rate DOUBLE, false_positive_rate DOUBLE, auc DOUBLE)
LANGUAGE PYTHON
HANDLER 'ROCCalculator'
COMMENT 'Compute ROC curve and AUC using scikit-learn'
AS $$
class ROCCalculator:
def __init__(self):
from sklearn import metrics
self._roc_curve = metrics.roc_curve
self._roc_auc_score = metrics.roc_auc_score

self._true_labels = []
self._predicted_scores = []

def eval(self, row):
if 'y_true' not in row or 'y_score' not in row:
raise KeyError("Required columns 'y_true' and 'y_score' not found")

true_label = row['y_true']
predicted_score = row['y_score']

label = float(true_label)
self._true_labels.append(label)
self._predicted_scores.append(float(predicted_score))

def terminate(self):
false_pos_rate, true_pos_rate, thresholds = self._roc_curve(
self._true_labels,
self._predicted_scores,
drop_intermediate=False
)

auc_score = float(self._roc_auc_score(self._true_labels, self._predicted_scores))

for threshold, tpr, fpr in zip(thresholds, true_pos_rate, false_pos_rate):
yield float(threshold), float(tpr), float(fpr), auc_score
$$;

Crie dados de classificação binária de exemplo com previsões:

SQL
CREATE OR REPLACE TEMPORARY VIEW binary_classification_data AS
SELECT *
FROM VALUES
( 1, 1.0, 0.95, 'high_confidence_positive'),
( 2, 1.0, 0.87, 'high_confidence_positive'),
( 3, 1.0, 0.82, 'medium_confidence_positive'),
( 4, 0.0, 0.78, 'false_positive'),
( 5, 1.0, 0.71, 'medium_confidence_positive'),
( 6, 0.0, 0.65, 'false_positive'),
( 7, 0.0, 0.58, 'true_negative'),
( 8, 1.0, 0.52, 'low_confidence_positive'),
( 9, 0.0, 0.45, 'true_negative'),
(10, 0.0, 0.38, 'true_negative'),
(11, 1.0, 0.31, 'low_confidence_positive'),
(12, 0.0, 0.15, 'true_negative'),
(13, 0.0, 0.08, 'high_confidence_negative'),
(14, 0.0, 0.03, 'high_confidence_negative')
AS data(sample_id, y_true, y_score, prediction_type);

Calcule a curva ROC e a AUC:

SQL
SELECT
threshold,
true_positive_rate,
false_positive_rate,
auc
FROM compute_roc_curve(
TABLE(
SELECT y_true, y_score
FROM binary_classification_data
WHERE y_true IS NOT NULL AND y_score IS NOT NULL
ORDER BY sample_id
)
)
ORDER BY threshold DESC;
Output
+-----------+---------------------+----------------------+-------+
| threshold | true_positive_rate | false_positive_rate | auc |
+-----------+---------------------+----------------------+-------+
| 1.95 | 0.0 | 0.0 | 0.786 |
| 0.95 | 0.167 | 0.0 | 0.786 |
| 0.87 | 0.333 | 0.0 | 0.786 |
| 0.82 | 0.5 | 0.0 | 0.786 |
| 0.78 | 0.5 | 0.125 | 0.786 |
| 0.71 | 0.667 | 0.125 | 0.786 |
| 0.65 | 0.667 | 0.25 | 0.786 |
| 0.58 | 0.667 | 0.375 | 0.786 |
| 0.52 | 0.833 | 0.375 | 0.786 |
| 0.45 | 0.833 | 0.5 | 0.786 |
| 0.38 | 0.833 | 0.625 | 0.786 |
| 0.31 | 1.0 | 0.625 | 0.786 |
| 0.15 | 1.0 | 0.75 | 0.786 |
| 0.08 | 1.0 | 0.875 | 0.786 |
| 0.03 | 1.0 | 1.0 | 0.786 |
+-----------+---------------------+----------------------+-------+

Exemplo: Projeção dinâmica de colunas a partir de um argumento de tabela

Este exemplo combina UDTFs polimórficas com argumentos de tabela. A UDTF aceita uma tabela e uma lista de nomes de colunas separados por vírgulas e, em seguida, projeta apenas essas colunas da entrada. O método analyze inspeciona o esquema da tabela de entrada e constrói um esquema de saída contendo apenas as colunas solicitadas.

SQL
CREATE OR REPLACE FUNCTION project_columns(t TABLE, columns STRING)
RETURNS TABLE
LANGUAGE PYTHON
HANDLER 'ProjectColumns'
AS $$
class ProjectColumns:
@staticmethod
def analyze(t, columns):
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeResult

requested = [c.strip() for c in columns.value.split(",")]
input_schema = t.dataType
output_fields = []
for field in input_schema.fields:
if field.name in requested:
output_fields.append(field)
if not output_fields:
raise ValueError(
f"None of the requested columns {requested} "
f"exist in the input table"
)
return AnalyzeResult(schema=StructType(output_fields))

def eval(self, row, columns: str):
requested = [c.strip() for c in columns.split(",")]
yield tuple(row[col] for col in requested if col in row)
$$;

Utilize a função para selecionar colunas específicas de uma tabela:

SQL
SELECT * FROM project_columns(
TABLE(SELECT * FROM samples.nyctaxi.trips LIMIT 5),
'pickup_zip, dropoff_zip, fare_amount'
);

Limitações

As seguintes limitações se aplicam aos UDTFs do Unity Catalog Python:

Próximos passos