Pular para o conteúdo principal

Tutorial: clusteringK-means

info

Visualização

Este recurso está em Pré-visualização Pública.

Este tutorial mostra como construir um operador de função de tabela definida pelo usuário (UDTF) Python para LakeFlow Designer que executa clustering K-means com scikit-learn. As UDTFs são muito adequadas para tarefas machine learning que processam conjuntos de dados completos. Para obter informações básicas sobre operadores definidos pelo usuário, consulte Operadores definidos pelo usuário no LakeFlow Designer.

Visão geral

Este tutorial mostra passo a passo como criar um operador definido pelo usuário (UDTF) usando Python. O operador realiza clustering K-Means nas colunas selecionadas, permitindo aos usuários:

  • Escolha quais colunas usar como recurso.
  • Especifique o número de clusters.
  • Obtenha uma tabela com as atribuições de cluster para cada linha.

Passo 1: Compreender o padrão de manipulador UDTF

Uma UDTF é implementada como uma classe Python com dois métodos key :

  • eval(row, ...) — Chamado para cada linha de entrada para acumular dados
  • terminate() — Chamado após todas as linhas serem processadas para gerar resultados

Esse padrão permite que a UDTF:

  1. Coletar todos os pontos de dados durante as chamadas eval()
  2. ensinar o modelo K-Means em terminate()
  3. Obtenha resultados agrupados linha por linha.
Python
class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []

def eval(self, row, id_column, columns, k):
"""Called one time per input row - accumulate data here."""
# Initialize configuration on first row
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))

# Convert row to dictionary and store
row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)

# Extract numeric features
feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)

def terminate(self):
"""Called after all rows - train model and yield results."""
import numpy as np
from sklearn.cluster import KMeans

if not self.rows:
return

X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)

model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)

# Yield results row by row
for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
nota

O parâmetro row em eval() é um objeto Row do PySpark. Use .asDict() para convertê-lo em um dicionário para facilitar o acesso.

o passo 2: Crie o YAML para o operador

A configuração YAML define como o operador aparece no LakeFlow Designer. Para esta operadora:

  • Parâmetro numérico (k): Número de clusters a serem criados
  • Widget de seleção (id_column): menu suspenso preenchido com colunas da tabela de entrada
  • Widget de seleção múltipla (columns): Seleção de várias colunas de recursos
  • optionsSource : Preenche automaticamente a lista suspensa a partir do esquema da tabela de entrada
  • Porta de entrada : Especifica que este operador aceita dados tabulares.
YAML
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: '1.0.0'
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data

Consulte a referência YAML do operador definido pelo usuário para obter um guia completo de todas as propriedades, tipos de dados, widgets e opções disponíveis.

o passo 3: Criar a função Unity Catalog

Combine a configuração YAML e a classe manipuladora Python em uma única declaração CREATE FUNCTION .

SQL
CREATE OR REPLACE FUNCTION main.my_schema.k_means(
input_data TABLE,
id_column STRING,
columns ARRAY<STRING>,
k INT
)
RETURNS TABLE (
id STRING,
cluster_id INT
)
LANGUAGE PYTHON
HANDLER 'SklearnKMeans'
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: "1.0.0"
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data
"""

class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []

def eval(self, row, id_column, columns, k):
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))

row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)

feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)

def terminate(self):
import numpy as np
from sklearn.cluster import KMeans

if not self.rows:
return

X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)

model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)

for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
$$

a etapa 4: Teste com dados de amostra

Crie dados de amostra do cliente para teste:

SQL
-- Create sample customer data
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
('C001', 25, 35000, 20),
('C002', 45, 85000, 80),
('C003', 35, 55000, 50),
('C004', 50, 95000, 90),
('C005', 23, 30000, 15),
('C006', 40, 75000, 70),
('C007', 60, 100000, 95),
('C008', 30, 45000, 40)
AS t(customer_id, age, annual_income, spending_score);

Teste a UDTF do K-Means:

SQL
-- Run K-Means clustering with 3 clusters
SELECT * FROM main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
)

Nesse caso, você deseja join os resultados clustering com os dados originais para visualizar as atribuições cluster :

SQL
-- Join cluster results with original data
SELECT
c.*,
k.cluster_id
FROM customers c
INNER JOIN main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
) k
ON c.customer_id = k.id
ORDER BY k.cluster_id, c.customer_id

o passo 5: registrar a operadora

Para usar o operador no LakeFlow Designer, você deve registrá-lo, adicionando-o ao seu arquivo .user_defined_operators.yaml :

YAML
operators:
- catalog: main
schema: my_schema
functionName: k_means
nota

Se você definir esse arquivo na sua pasta de usuário, ele aparecerá apenas para você. Para obter mais informações, consulte Torne seu operador detectável.

o passo 6: Configurar permissões

Conceda acesso aos usuários que precisam usar esta operadora:

SQL
GRANT USE SCHEMA ON SCHEMA main.my_schema TO `<user>`;
GRANT EXECUTE ON FUNCTION main.my_schema.k_means TO `<user>`;

Utilizando o operador no LakeFlow Designer

Após o registro, o operador aparecerá no LakeFlow Designer com as seguintes características:

  • Uma porta de entrada para conectar sua fonte de dados.
  • Um dropdown para selecionar qual coluna identifica exclusivamente as linhas.
  • Uma seleção múltipla para escolher quais colunas usar como recurso clustering .
  • Um número de entrada para a quantidade desejada de clusters.

Os usuários podem segmentar clientes, produtos ou quaisquer outros dados em grupos significativos sem precisar escrever código.

Dicas para criar UDTFs

  1. Inicializar estado em __init__ — Configurar listas/variáveis vazias para acumular dados
  2. Acumule em eval — Não processe ainda, apenas colete os dados
  3. Processo em terminate — É aqui que o trabalho de verdade acontece.
  4. Use yield para retornar linhas — Retorna os resultados um de cada vez de terminate
  5. Lidar com casos extremos — E se houver menos linhas do que clusters?
  6. Mantenha os tipos explícitos — as UDTFs retornadas não podem referenciar tipos de entrada.