メインコンテンツまでスキップ

宣言的な特徴を備えたモデルをトレーニングする

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

このページでは、モデルのトレーニングに宣言型機能を使用する方法について説明します。 宣言型機能の定義に関する情報については、 「宣言型機能」を参照してください。

要件

  • 機能は宣言型機能APIを使用して作成する必要があります。宣言型機能を参照してください。

APIメソッド

create_training_set()

宣言型特徴量を作成した後は、次のステップとしてモデル用のトレーニングデータを作成します。これを行うには、ラベル付きデータセットをcreate_training_setに渡します。これにより、各特徴量の値が特定の時点で正確に計算されることが自動的に保証されます。

例えば:

Python
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
) -> TrainingSet

TrainingSet.load_dfを呼び出して、元のトレーニング データをポイントインタイムの動的コンピュート機能と結合します。

引数dfは、以下の要件を満たす必要があります。

  • 機能定義によって参照されるすべてのエンティティ列を含める必要があります。
  • フィーチャ定義で参照される時系列列が含まれている必要があります。
  • 任意のRequestSourceスキーマで宣言されているすべての列が含まれている必要があります。型は宣言されたスキーマに対して検証されます。不一致がある場合はエラーが発生します(暗黙的な型変換は行われません)。
  • ラベル列が含まれている必要があります。
  • エンティティ列名、時系列列名、およびリクエスト機能列名のセットは、すべてのソースにおいてグローバルに一意である必要があります。

ポイントインタイムの正確性: テーブル ソースに裏付けられた集計およびColumnSelection機能の場合、将来のモデル トレーニングへのデータ漏洩を防ぐために、各行のタイムスタンプより前に利用可能なソース データのみを使用して機能がコンピュートされます。 RequestSource特徴量については、値はラベル付きDataFrameフレームの行から直接取得されます。

log_model()

MLflowを使用して、リネージ追跡と推論中の自動特徴検索のための特徴メタデータを含むモデルをログに記録します。

Python
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)

flavorでは、使用するMLflowモデル フレーバーモジュール ( mlflow.sklearnmlflow.xgboostなど) を指定します。

TrainingSetでログに記録されたモデルは、トレーニングで使用された機能へのリネージを自動的に追跡します。 トレーニングセットにRequestSourceの特徴量が含まれている場合、 RequestSource個の列が必須入力としてMLflowモデルのシグネチャに追加されます。これにより、サービス提供エンドポイントのAPIスキーマが、呼び出し元が推論時に提供する必要のあるフィールドを反映していることが保証されます。詳細については、特徴量テーブルでトレーニングするモデルをご覧ください。

score_batch()

自動特徴量検索によるバッチ推論を実行する:

Python
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame

score_batch モデルとともに保存された特徴メタデータを使用して、推論用の特定時点の正しい特徴を自動的にコンピュートし、トレーニングとの一貫性を確保します。 詳細については、特徴量テーブルでトレーニングするモデルをご覧ください。

ワークフローの例

Python
import mlflow
from databricks.feature_engineering import FeatureEngineeringClient
from sklearn.ensemble import RandomForestClassifier

fe = FeatureEngineeringClient()

# Assume features are registered in UC
# labeled_df should have columns "user_id", "transaction_time", and "is_fraud"

# 1. Create training set using declarative features
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="is_fraud",
)

# 2. Load training data with computed features
training_df = training_set.load_df()
X = training_df.drop("is_fraud").toPandas()
y = training_df.select("is_fraud").toPandas().values.ravel()

# 3. Train model
model = RandomForestClassifier().fit(X, y)

# 4. Log model with feature metadata
with mlflow.start_run():
fe.log_model(
model=model,
artifact_path="fraud_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="main.ecommerce.fraud_model",
)

# 5. Batch scoring with automatic feature lookup
# inference_df must contain the same entity and timeseries columns
# used during training. Features are automatically computed.
predictions = fe.score_batch(
model_uri="models:/main.ecommerce.fraud_model/1",
df=inference_df,
)
predictions.display()

RequestSourceの機能を使ったトレーニング

モデルが推論時に提供されるデータ(API呼び出しからのトランザクションの詳細など)を必要とする場合は、テーブルベースの機能と併せてRequestSource機能を使用してください。トレーニング中に、ラベル付きDataFrameからRequestSource列が抽出されます。

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
DeltaTableSource, Feature, FieldDefinition, RequestSource,
ScalarDataType, ColumnSelection,
)

fe = FeatureEngineeringClient()

# RequestSource provides transaction data at inference time
request_source = RequestSource(
schema=[
FieldDefinition(name="transaction_amount", data_type=ScalarDataType.DOUBLE),
FieldDefinition(name="vendor_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_id", data_type=ScalarDataType.STRING),
FieldDefinition(name="transaction_time", data_type=ScalarDataType.DATE),
]
)

delta_source = DeltaTableSource(
catalog_name="catalog",
schema_name="schema",
table_name="vendor_data",
)

# A column selection feature from the request source (pass-through)
latest_transaction_amount = Feature(
source=request_source,
function=ColumnSelection("transaction_amount"),
name="latest_transaction_amount",
)

# A lookup feature from a delta table
vendor_category = Feature(
source=delta_source,
function=ColumnSelection("vendor_category"),
entity=["vendor_id"],
timeseries_column="transaction_time",
name="vendor_category",
)

# labels_df must contain: transaction_id, transaction_time, vendor_id,
# transaction_amount, and the label column.
ts = fe.create_training_set(
df=labels_df,
features=[latest_transaction_amount, vendor_category],
label="is_fraud",
exclude_columns=["card_id"],
)

import mlflow
from sklearn.ensemble import RandomForestClassifier

with mlflow.start_run():
training_df = ts.load_df().toPandas()
X = training_df.drop(columns=["is_fraud"])
y = training_df["is_fraud"]
model = RandomForestClassifier().fit(X, y)

# log_model() adds RequestSource columns to the MLflow model signature
fe.log_model(
model=model,
artifact_path="fraud_model",
flavor=mlflow.sklearn,
training_set=ts,
registered_model_name="catalog.schema.fraud_model",
)

サービング時に生のモデルに届くもの

Feature Storeモデルラッパーは、列をフィルタリングしてから、生のモデルに渡します。

列のタイプ

内部モデルに到達しますか?

明示的な特徴出力( ColumnSelection 、集約)

はい

RequestSource 特徴量として宣言された列

はい

エンティティ列(ルックアップキー)

いいえ(機能として明示的に宣言されていない限り)

時系列コラム

いいえ(機能として明示的に宣言されていない限り)