Tutorial: K-means clustering
This feature is in Public Preview.
This tutorial shows how to build a Python user-defined table function (UDTF) operator for Lakeflow Designer that runs K-means clustering with scikit-learn. UDTFs are well-suited to machine learning tasks that process entire datasets. For background on user-defined operators, see User-defined operators in Lakeflow Designer.
Overview
This tutorial steps you through creating a UDTF user-defined operator using Python. The operator performs K-Means clustering on selected columns, allowing users to:
- Choose which columns to use as features.
- Specify the number of clusters.
- Get back a table with cluster assignments for each row.
Step 1: Understand the UDTF handler pattern
A UDTF is implemented as a Python class with two key methods:
eval(row, ...)— Called for each input row to accumulate dataterminate()— Called after all rows are processed to yield results
This pattern allows the UDTF to:
- Collect all data points during
eval()calls - Train the K-Means model in
terminate() - Yield clustered results row by row
class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []
def eval(self, row, id_column, columns, k):
"""Called one time per input row - accumulate data here."""
# Initialize configuration on first row
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))
# Convert row to dictionary and store
row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)
# Extract numeric features
feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)
def terminate(self):
"""Called after all rows - train model and yield results."""
import numpy as np
from sklearn.cluster import KMeans
if not self.rows:
return
X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)
model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)
# Yield results row by row
for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
The row parameter in eval() is a PySpark Row object. Use .asDict() to convert it to a dictionary for easier access.
Step 2: Create the YAML for the operator
The YAML configuration defines how the operator appears in Lakeflow Designer. For this operator:
- Number parameter (
k): Number of clusters to create - Select widget (
id_column): Dropdown populated with columns from input table - Multi-select widget (
columns): Multiple feature columns selection optionsSource: Automatically populates dropdowns from input table schema- Input port: Specifies this operator accepts tabular data
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: '1.0.0'
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data
See User-defined operator YAML reference for a comprehensive guide to all available properties, data types, widgets, and options.
Step 3: Create the Unity Catalog function
Combine the YAML configuration and Python handler class into a single CREATE FUNCTION statement.
CREATE OR REPLACE FUNCTION main.my_schema.k_means(
input_data TABLE,
id_column STRING,
columns ARRAY<STRING>,
k INT
)
RETURNS TABLE (
id STRING,
cluster_id INT
)
LANGUAGE PYTHON
HANDLER 'SklearnKMeans'
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: "1.0.0"
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data
"""
class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []
def eval(self, row, id_column, columns, k):
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))
row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)
feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)
def terminate(self):
import numpy as np
from sklearn.cluster import KMeans
if not self.rows:
return
X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)
model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)
for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
$$
Step 4: Test with sample data
Create sample customer data for testing:
-- Create sample customer data
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
('C001', 25, 35000, 20),
('C002', 45, 85000, 80),
('C003', 35, 55000, 50),
('C004', 50, 95000, 90),
('C005', 23, 30000, 15),
('C006', 40, 75000, 70),
('C007', 60, 100000, 95),
('C008', 30, 45000, 40)
AS t(customer_id, age, annual_income, spending_score);
Test the K-Means UDTF:
-- Run K-Means clustering with 3 clusters
SELECT * FROM main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
)
In this case, you want to join the clustering results back with original data to see cluster assignments:
-- Join cluster results with original data
SELECT
c.*,
k.cluster_id
FROM customers c
INNER JOIN main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
) k
ON c.customer_id = k.id
ORDER BY k.cluster_id, c.customer_id
Step 5: Register the operator
To use the operator in Lakeflow Designer, you must register it, by adding it to your .user_defined_operators.yaml file:
operators:
- catalog: main
schema: my_schema
functionName: k_means
If you define this file in your user folder, it only appears for you. For more information, see Make your operator discoverable.
Step 6: Set up permissions
Grant access to users who need to use this operator:
GRANT USE SCHEMA ON SCHEMA main.my_schema TO `<user>`;
GRANT EXECUTE ON FUNCTION main.my_schema.k_means TO `<user>`;
Using the operator in Lakeflow Designer
After it's registered, the operator will appear in Lakeflow Designer with:
- An input port to connect your data source
- A dropdown to select which column uniquely identifies rows
- A multi-select to choose which columns to use as clustering features
- A number input for the desired number of clusters
Users can segment customers, products, or any other data into meaningful groups without writing code.
Tips for building UDTFs
- Initialize state in
__init__— Set up empty lists/variables to accumulate data - Accumulate in
eval— Don't process yet, just collect data - Process in
terminate— This is where the real work happens - Use
yieldto return rows — Return results one at a time fromterminate - Handle edge cases — What if there are fewer rows than clusters?
- Keep types explicit — UDTF returns cannot reference input types