チュートリアル: K-means クラスタリング
プレビュー
この機能は パブリック プレビュー段階です。
このチュートリアルではscikit-learnを使用して K-means クラスタリングを実行するLakeflow Designer 用のPythonユーザー定義テーブル関数 (UDTF) 演算子を構築する方法を示します。 UDTFは、データセット全体を処理する機械学習タスクに最適です。ユーザー定義演算子に関する背景情報については、 Lakeflow Designer のユーザー定義演算子」を参照してください。
概要
このチュートリアルでは、 Pythonを使用して UDTF ユーザー定義オペレーターを作成する手順を説明します。 この演算子は、選択した列に対して K-Means クラスタリングを実行し、ユーザーが次の操作を実行できるようにします。
- 特徴として使用する列を選択します。
- クラスターの数を指定します。
- 各行のクラスター割り当てを含むテーブルを取得します。
ステップ 1: UDTF ハンドラー パターンを理解する
UDTF は、次の 2 つの主要なメソッドを持つ Python クラスとして実装されます。
eval(row, ...)— 入力行ごとに呼び出され、データを蓄積しますterminate()— すべての行が処理されて結果が生成された後、呼び出されます
このパターンにより、UDTF は次のことが可能になります。
eval()通話中にすべてのデータポイントを収集する- K-Meansモデルをトレーニングする
terminate() - 行ごとにクラスター化された結果を生成します
class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []
def eval(self, row, id_column, columns, k):
"""Called one time per input row - accumulate data here."""
# Initialize configuration on first row
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))
# Convert row to dictionary and store
row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)
# Extract numeric features
feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)
def terminate(self):
"""Called after all rows - train model and yield results."""
import numpy as np
from sklearn.cluster import KMeans
if not self.rows:
return
X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)
model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)
# Yield results row by row
for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
eval()のrow問題はPySpark Row オブジェクトです。 アクセスしやすいように、 .asDict()を使用して辞書に変換します。
ステップ 2: オペレーターの YAML を作成する
YAML 構成は、 Lakeflow Designer でオペレーターがどのように表示されるかを定義します。 この演算子の場合:
- Number (
k): 作成するクラスターの数 - ウィジェットを選択 (
id_column): 入力テーブルからの列がドロップダウンに入力されました - 複数選択ウィジェット (
columns): 複数のフィーチャ列の選択 optionsSource: 入力テーブル スキーマからドロップダウンを自動的に入力します- 入力ポート : この演算子が表形式のデータを受け入れることを指定します
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: '1.0.0'
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data
利用可能なすべてのプロパティ、データ型、ウィジェット、およびオプションの包括的なガイドについては、ユーザー定義演算子 YAML リファレンスを参照してください。
ステップ 3: Unity Catalog関数を作成する
YAML 構成と Python ハンドラー クラスを 1 つのCREATE FUNCTIONステートメントに結合します。
CREATE OR REPLACE FUNCTION main.my_schema.k_means(
input_data TABLE,
id_column STRING,
columns ARRAY<STRING>,
k INT
)
RETURNS TABLE (
id STRING,
cluster_id INT
)
LANGUAGE PYTHON
HANDLER 'SklearnKMeans'
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udtf
name: K-Means Clustering
id: kmeans
version: "1.0.0"
description: Perform K-Means clustering on selected columns
config:
type: object
properties:
k:
type: number
title: Number of Clusters
default: 3
minimum: 1
maximum: 100
x-ui:
widget: number
id_column:
type: string
title: ID Column
x-ui:
widget: select
optionsSource:
type: inputColumns
port: input_data
columns:
type: array
items:
type: string
title: Feature Columns
x-ui:
widget: multi-select
optionsSource:
type: inputColumns
port: input_data
required:
- k
- id_column
- columns
additionalProperties: false
ports:
input:
- name: input_data
title: Input Data
output:
- name: output
title: Clustered Data
"""
class SklearnKMeans:
def __init__(self):
self.id_col = None
self.feature_cols = None
self.k = None
self.rows = []
self.features = []
def eval(self, row, id_column, columns, k):
if self.id_col is None:
self.id_col = id_column
if self.feature_cols is None:
self.feature_cols = columns
if self.k is None:
self.k = max(1, int(k))
row_dict = row.asDict(recursive=False)
self.rows.append(row_dict)
feats = []
for c in self.feature_cols:
v = row_dict.get(c)
if v is None:
v = 0.0
feats.append(float(v))
self.features.append(feats)
def terminate(self):
import numpy as np
from sklearn.cluster import KMeans
if not self.rows:
return
X = np.asarray(self.features, dtype=float)
n_samples = X.shape[0]
n_clusters = min(self.k, n_samples)
model = KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=42
)
labels = model.fit_predict(X)
for row_dict, label in zip(self.rows, labels):
yield str(row_dict[self.id_col]), int(label)
$$
ステップ 4: サンプルデータでテストする
テスト用のサンプル顧客データを作成します。
-- Create sample customer data
CREATE OR REPLACE TEMP VIEW customers AS
SELECT * FROM VALUES
('C001', 25, 35000, 20),
('C002', 45, 85000, 80),
('C003', 35, 55000, 50),
('C004', 50, 95000, 90),
('C005', 23, 30000, 15),
('C006', 40, 75000, 70),
('C007', 60, 100000, 95),
('C008', 30, 45000, 40)
AS t(customer_id, age, annual_income, spending_score);
K-Means UDTF をテストします。
-- Run K-Means clustering with 3 clusters
SELECT * FROM main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
)
この場合、クラスタリングの結果を元のデータと結合して、クラスタの割り当てを確認します。
-- Join cluster results with original data
SELECT
c.*,
k.cluster_id
FROM customers c
INNER JOIN main.my_schema.k_means(
input_data => TABLE(SELECT * FROM customers) WITH SINGLE PARTITION,
k => 3,
id_column => 'customer_id',
columns => array('age', 'annual_income', 'spending_score')
) k
ON c.customer_id = k.id
ORDER BY k.cluster_id, c.customer_id
ステップ 5: オペレーターを登録する
Lakeflow Designer でオペレーターを使用するには、それを.user_defined_operators.yamlファイルに追加して登録する必要があります。
operators:
- catalog: main
schema: my_schema
functionName: k_means
このファイルをユーザーフォルダ内に定義した場合、そのファイルはあなた以外には表示されません。詳細については、 「オペレータを検出可能にする」を参照してください。
ステップ 6: 権限を設定する
この演算子を使用する必要があるユーザーにアクセス権を付与します。
GRANT USE SCHEMA ON SCHEMA main.my_schema TO `<user>`;
GRANT EXECUTE ON FUNCTION main.my_schema.k_means TO `<user>`;
Lakeflow Designer での演算子の使用
登録されると、オペレーターはLakeflow Designer に次のように表示されます。
- データソースを接続するための入力ポート
- 行を一意に識別する列を選択するためのドロップダウン
- クラスタリング機能として使用する列を選択するための複数選択
- 希望するクラスターの数を入力する数値
ユーザーは、コードを書かずに、顧客、製品、またはその他のデータを意味のあるグループに分割できます。
UDTF構築のヒント
__init__状態を初期化します - データを蓄積するために空のリスト/変数を設定しますevalに蓄積 — まだ処理せず、データを収集するだけですterminateのプロセス — ここで実際の作業が行われますyieldを使用して行を返します - 結果を1つずつ返しますterminate- エッジケースの処理 - クラスターよりも行数が少ない場合はどうなるでしょうか?
- 型を明示的に保持する - UDTFの戻り値は入力型を参照できません