メインコンテンツまでスキップ

機能を使用してモデルをトレーニングします

この記事では、 Unity Catalogでの特徴量エンジニアリングまたは従来のワークスペース Feature Store を使用してモデルをトレーニングする方法について説明します。 まず、使用する特徴とその結合方法を定義するトレーニングデータセットを作成する必要があります。 その後、モデルをトレーニングすると、モデルは特徴への参照を保持します。

Unity Catalogでの特徴量エンジニアリングを使用してモデルを学習させると、Catalog Explorer でモデルのリネージを表示できます。モデルの作成に使用されたテーブルと関数は、自動的に追跡され、表示されます。 「Feature governance (機能ガバナンス)」と「リネージ」を参照してください

モデルを推論に使用する場合、特徴ストアから特徴値を取得するように選択できます。 特徴量テーブルモデルは、MLflow pyfunc インターフェースとも互換性があるため、MLflow を使用して特徴量テーブルでバッチ推論を実行できます。

モデルでは、トレーニングに使用できるテーブルは最大 50 個と 100 個です。

トレーニング データセットを作成する

特徴量テーブル for model トレーニングから特定の特徴量を選択するには、FeatureEngineeringClient.create_training_set ( Unity Catalogでの特徴量エンジニアリング) または FeatureStoreClient.create_training_set (ワークスペース Feature Store) API と FeatureLookup というオブジェクトを使用してトレーニング データセットを作成します。 FeatureLookupには、特徴量テーブルの名前、特徴量テーブルをcreate_training_setに渡された DataFrame と結合するときに使用するキーなど、トレーニングセットで使用する各機能を指定します。 詳細については 、「Feature Lookup 」を参照してください。

FeatureLookupを作成するときは、feature_names パラメーターを使用します。feature_names は、トレーニング セットの作成時に特徴量テーブル内のすべての機能 (主キーを除く) を検索するために 1 つの特徴名、特徴名のリスト、または None を取ります。

注記

DataFrame 内の lookup_key 列の型と順序は、参照特徴量テーブルの主キー (タイムスタンプ キーを除く) の型と順序と一致する必要があります。

この記事には、両方のバージョンの構文のコード例が含まれています。

この例では、 trainingSet.load_df によって返される DataFrame には、 feature_lookupsの各機能の列が含まれています。 create_training_set に提供された DataFrame のすべての列は、 exclude_columnsを使用して除外された列を除き、保持されます。

Python
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]

fe = FeatureEngineeringClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

ルックアップキーがプライマリキーと一致しない場合のTrainingSetの作成

FeatureLookup の引数 lookup_key をトレーニング セットの列名に使用します。create_training_set は、 lookup_key 引数で指定されたトレーニング セットの列間で、特徴量テーブルの作成時に主キーが指定された順序を使用して、順序付けられた結合を実行します。

この例では、 recommender_system.customer_features のプライマリ・キーは customer_iddtです。

recommender_system.product_features 特徴量テーブルには、主キー product_idがあります。

training_df に次の列があるとします。

  • cid
  • transaction_dt
  • product_id
  • rating

次のコードは、 TrainingSetの正しい機能ルックアップを作成します。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]

create_training_set が呼び出されると、次のコードに示すように、左結合を実行し、(customer_id,dt) に対応するキー (cid,transaction_dt) を使用してテーブル recommender_system.customer_featurestraining_df を結合して、トレーニングデータセットを作成します。

Python
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")

training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)

異なる特徴量テーブルから同じ名前の 2 つの特徴を含む TrainingSet を作成します

オプションの引数 output_nameFeatureLookupで使用します。 指定された名前は、 TrainingSet.load_dfによって返される DataFrame の機能名の代わりに使用されます。 たとえば、次のコードでは、 training_set.load_df によって返される DataFrame には、列 customer_heightproduct_heightが含まれます。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]

fe = FeatureEngineeringClient()

with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()

同じ機能を何度も使ってトレーニングセットを作成してください

異なるルックアップ キーで結合された同じフィーチャを使用して TrainingSet を作成するには、複数の FeatureLookup を使用します。機能検索の出力ごとに固有の output_name を使用してください。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]

教師なし機械学習モデルの TrainingSet を作成する

教師なし学習モデルの TrainingSet を作成するときに label=None を設定します。 たとえば、次の TrainingSet では、次のことができます。 は、さまざまな顧客の興味に基づいてグループにクラスターするために使用されます。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]

fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)

training_df = training_set.load_df()

View を特徴量テーブルとして使用する場合の TrainingSet の作成

ビューを特徴量テーブルとして使用するには、 Databricks Runtime 16.0 MLに組み込まれている 0.7.0 以降のバージョンを使用する必要がありますdatabricks-feature-engineering

ビューは、ソース Delta テーブルからの単純な SELECT ビューである必要があります。 単純な SELECT ビューは、特徴量テーブルとして使用でき、プライマリ・キーが JOIN、グループ BY、または DISTINCT 句なしで選択されている 内の 1 つの ・テーブルから作成されたビューとして定義されます。DeltaUnity CatalogSQL 文で使用できるキーワードは、SELECT、FROM、WHERE、ORDER BY、LIMIT、および OFFSET です。

次の例では、 ml.recommender_system.customer_table にプライマリ・キー ( ciddtがあります。ここで、 dt は時系列カラムです。 この例では、データフレームtraining_dfに列 、dt``cidlabelがあることを前提としています。

Python
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")

feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['pid', 'rating'],
lookup_key=['cid'],
timestamp_lookup_key='dt'
),
]

fe = FeatureEngineeringClient()

training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='label'
)

training_df = training_set.load_df()

特徴量テーブルを使用したモデルの学習とバッチ推論の実行

Feature Store の特徴を使用してモデルをトレーニングすると、モデルは特徴への参照を保持します。 推論にモデルを使用する場合、Feature Store から特徴値を取得するように選択できます。 モデルで使用されるフィーチャの主キーを指定する必要があります。 モデルは、ワークスペースの Feature Store から必要な特徴を取得します。 その後、スコアリング中に必要に応じて特徴値を結合します。

推論時の機能検索をサポートするには、次のようにします。

  • FeatureEngineeringClient ( Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient (ワークスペース Feature Storeの場合) のlog_model方法を使用してモデルをログに記録する必要があります。

  • TrainingSet.load_df から返された DataFrame を使用して、モデルをトレーニングする必要があります。この DataFrame をモデルのトレーニングに使用する前に何らかの方法で変更した場合、推論にモデルを使用するときに変更は適用されません。 これにより、モデルのパフォーマンスが低下します。

  • モデルの種類には、MLflow に対応する python_flavor が必要です。 MLflow では、次のようなほとんどの Python モデル トレーニング フレームワークがサポートされています。

    • Scikit-Learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras( python_flavor mlflow.kerasを使用)
  • カスタム MLflow pyfunc モデル

Python
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]

fe = FeatureEngineeringClient()

with mlflow.start_run():

# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df().toPandas()

# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating

model = linear_model.LinearRegression().fit(X_train, y_train)

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fe = FeatureEngineeringClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

特徴メタデータでパッケージ化されたモデルをスコアリングするときにカスタム特徴値を使用する

既定では、特徴メタデータと共にパッケージ化されたモデルは、推論時に特徴量テーブルから特徴を検索します。 スコアリングにカスタム特徴量値を使用するには、DataFrame FeatureEngineeringClient.score_batch(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.score_batch (ワークスペースFeature Store の場合) に渡される にそれらを含めます。

たとえば、次の 2 つの機能を持つモデルをパッケージ化するとします。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]

推論では、 account_creation_dateという名前の列を含む DataFrame で score_batch を呼び出すことで、機能account_creation_dateのカスタム値を指定できます。この場合、API は Feature Store から num_lifetime_purchases 特徴のみを検索し、指定されたカスタム account_creation_date 列の値をモデルのスコアリングに使用します。

Python
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)

Feature Store の特徴と Feature Store の外部に存在するデータの組み合わせを使用して、モデルの学習とスコア付けを行います

Feature Storeの機能とFeature Store以外のデータを組み合わせてモデルをトレーニングできます。 モデルを特徴メタデータと一緒にパッケージ化すると、モデルは推論のために機能ストアから特徴値を取得します。

モデルをトレーニングするには、DataFrame FeatureEngineeringClient.create_training_set(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.create_training_set (ワークスペースFeature Store の場合) に渡される に列として追加データを含めます。この例では、 Feature Store から特徴量total_purchases_30dと外部列browserを使用します。

Python
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]

fe = FeatureEngineeringClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)

推論時には、 FeatureStoreClient.score_batch で使用される DataFrame には browser 列が含まれている必要があります。

Python
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)

MLflow を使用してモデルを読み込み、バッチ推論を実行する

FeatureEngineeringClient ( Unity Catalogでの特徴量エンジニアリング) または FeatureStoreClient (ワークスペース Feature Store の場合) のlog_model方法を使用してモデルをログに記録した後、推論で MLflow を使用できます。 mlflow.pyfunc.predict Feature Store から特徴量を取得し、推論時に指定された値も結合します。 モデルで使用されるフィーチャの主キーを指定する必要があります。

注記

MLflow を使用したバッチ推論には、MLflow バージョン 2.11 以降が必要です。 time series 特徴量テーブルを使用するモデルはサポートされていません。time series 特徴量テーブルでバッチ推論を行うには、 score_batch. 特徴量テーブルを使用したモデルの学習とバッチ推論の実行を参照してください。

Python
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]

fe = FeatureEngineeringClient()

with mlflow.start_run():

# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df().toPandas()

# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating

model = linear_model.LinearRegression().fit(X_train, y_train)

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model",
#refers to the default value of "result_type" if not provided at inference
params={"result_type":"double"},
)

# Batch inference with MLflow

# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.

# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)

# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})

# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)

欠落している特徴量の処理

存在しないルックアップ キーが予測のためにモデルに渡されると、 FeatureLookup によってフェッチされる特徴値は NaNです。