機能を使用してモデルをトレーニングします
この記事では、 Unity Catalogでの特徴量エンジニアリングまたは従来のワークスペース Feature Store を使用してモデルをトレーニングする方法について説明します。 まず、使用する特徴とその結合方法を定義するトレーニングデータセットを作成する必要があります。 その後、モデルをトレーニングすると、モデルは特徴への参照を保持します。
Unity Catalogでの特徴量エンジニアリングを使用してモデルを学習させると、Catalog Explorer でモデルのリネージを表示できます。モデルの作成に使用されたテーブルと関数は、自動的に追跡され、表示されます。 「Feature governance (機能ガバナンス)」と「リネージ」を参照してください。
モデルを推論に使用する場合、特徴ストアから特徴値を取得するように選択できます。 特徴量テーブルモデルは、MLflow pyfunc インターフェースとも互換性があるため、MLflow を使用して特徴量テーブルでバッチ推論を実行できます。
モデルでは、トレーニングに使用できるテーブルは最大 50 個と 100 個です。
トレーニング データセットを作成する
特徴量テーブル for model トレーニングから特定の特徴量を選択するには、FeatureEngineeringClient.create_training_set
( Unity Catalogでの特徴量エンジニアリング) または FeatureStoreClient.create_training_set
(ワークスペース Feature Store) API と FeatureLookup
というオブジェクトを使用してトレーニング データセットを作成します。 FeatureLookup
には、特徴量テーブルの名前、特徴量テーブルをcreate_training_set
に渡された DataFrame と結合するときに使用するキーなど、トレーニングセットで使用する各機能を指定します。 詳細については 、「Feature Lookup 」を参照してください。
FeatureLookup
を作成するときは、feature_names
パラメーターを使用します。feature_names
は、トレーニング セットの作成時に特徴量テーブル内のすべての機能 (主キーを除く) を検索するために 1 つの特徴名、特徴名のリスト、または None を取ります。
DataFrame 内の lookup_key
列の型と順序は、参照特徴量テーブルの主キー (タイムスタンプ キーを除く) の型と順序と一致する必要があります。
この記事には、両方のバージョンの構文のコード例が含まれています。
この例では、 trainingSet.load_df
によって返される DataFrame には、 feature_lookups
の各機能の列が含まれています。 create_training_set
に提供された DataFrame のすべての列は、 exclude_columns
を使用して除外された列を除き、保持されます。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
from databricks.feature_store import FeatureLookup, FeatureStoreClient
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key='customer_id'
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
ルックアップキーがプライマリキーと一致しない場合のTrainingSetの作成
FeatureLookup
の引数 lookup_key
をトレーニング セットの列名に使用します。create_training_set
は、 lookup_key
引数で指定されたトレーニング セットの列間で、特徴量テーブルの作成時に主キーが指定された順序を使用して、順序付けられた結合を実行します。
この例では、 recommender_system.customer_features
のプライマリ・キーは customer_id
、 dt
です。
recommender_system.product_features
特徴量テーブルには、主キー product_id
があります。
training_df
に次の列があるとします。
cid
transaction_dt
product_id
rating
次のコードは、 TrainingSet
の正しい機能ルックアップを作成します。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d', 'total_purchases_7d'],
lookup_key=['cid', 'transaction_dt']
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
create_training_set
が呼び出されると、次のコードに示すように、左結合を実行し、(customer_id
,dt
) に対応するキー (cid
,transaction_dt
) を使用してテーブル recommender_system.customer_features
と training_df
を結合して、トレーニングデータセットを作成します。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
異なる特徴量テーブルから同じ名前の 2 つの特徴を含む TrainingSet を作成します
オプションの引数 output_name
を FeatureLookup
で使用します。 指定された名前は、 TrainingSet.load_df
によって返される DataFrame の機能名の代わりに使用されます。 たとえば、次のコードでは、 training_set.load_df
によって返される DataFrame には、列 customer_height
と product_height
が含まれます。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['height'],
lookup_key='customer_id',
output_name='customer_height',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['height'],
lookup_key='product_id',
output_name='product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
同じ機能を何度も使ってトレーニングセットを作成してください
異なるルックアップ キーで結合された同じフィーチャを使用して TrainingSet を作成するには、複数の FeatureLookup を使用します。機能検索の出力ごとに固有の output_name
を使用してください。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='ml.taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
feature_lookups = [
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['pickup_zip'],
output_name='pickup_temp'
),
FeatureLookup(
table_name='taxi_data.zip_features',
feature_names=['temperature'],
lookup_key=['dropoff_zip'],
output_name='dropoff_temp'
)
]
教師なし機械学習モデルの TrainingSet を作成する
教師なし学習モデルの TrainingSet を作成するときに label=None
を設定します。 たとえば、次の TrainingSet では、次のことができます。
は、さまざまな顧客の興味に基づいてグループにクラスターするために使用されます。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['interests'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label=None,
exclude_columns=['customer_id']
)
training_df = training_set.load_df()
View を特徴量テーブルとして使用する場合の TrainingSet の作成
ビューを特徴量テーブルとして使用するには、 Databricks Runtime 16.0 MLに組み込まれている 0.7.0 以降のバージョンを使用する必要がありますdatabricks-feature-engineering
。
ビューは、ソース Delta テーブルからの単純な SELECT ビューである必要があります。 単純な SELECT ビューは、特徴量テーブルとして使用でき、プライマリ・キーが JOIN、グループ BY、または DISTINCT 句なしで選択されている 内の 1 つの ・テーブルから作成されたビューとして定義されます。DeltaUnity CatalogSQL 文で使用できるキーワードは、SELECT、FROM、WHERE、ORDER BY、LIMIT、および OFFSET です。
次の例では、 ml.recommender_system.customer_table
にプライマリ・キー ( cid
と dt
があります。ここで、 dt
は時系列カラムです。 この例では、データフレームtraining_df
に列 、dt``cid
、label
があることを前提としています。
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['pid', 'rating'],
lookup_key=['cid'],
timestamp_lookup_key='dt'
),
]
fe = FeatureEngineeringClient()
training_set = fe.create_training_set(
df=training_df,
feature_lookups=feature_lookups,
label='label'
)
training_df = training_set.load_df()
特徴量テーブルを使用したモデルの学習とバッチ推論の実行
Feature Store の特徴を使用してモデルをトレーニングすると、モデルは特徴への参照を保持します。 推論にモデルを使用する場合、Feature Store から特徴値を取得するように選択できます。 モデルで使用されるフィーチャの主キーを指定する必要があります。 モデルは、ワークスペースの Feature Store から必要な特徴を取得します。 その後、スコアリング中に必要に応じて特徴値を結合します。
推論時の機能検索をサポートするには、次のようにします。
-
FeatureEngineeringClient
( Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient
(ワークスペース Feature Storeの場合) のlog_model
方法を使用してモデルをログに記録する必要があります。 -
TrainingSet.load_df
から返された DataFrame を使用して、モデルをトレーニングする必要があります。この DataFrame をモデルのトレーニングに使用する前に何らかの方法で変更した場合、推論にモデルを使用するときに変更は適用されません。 これにより、モデルのパフォーマンスが低下します。 -
モデルの種類には、MLflow に対応する
python_flavor
が必要です。 MLflow では、次のようなほとんどの Python モデル トレーニング フレームワークがサポートされています。- Scikit-Learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras(
python_flavor
mlflow.keras
を使用)
-
カスタム MLflow pyfunc モデル
- Feature Engineering in Unity Catalog
- Workspace Feature Store
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fe = FeatureEngineeringClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
特徴メタデータでパッケージ化されたモデルをスコアリングするときにカスタム特徴値を使用する
既定では、特徴メタデータと共にパッケージ化されたモデルは、推論時に特徴量テーブルから特徴を検索します。 スコアリングにカスタム特徴量値を使用するには、DataFrame FeatureEngineeringClient.score_batch
(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.score_batch
(ワークスペースFeature Store の場合) に渡される にそれらを含めます。
たとえば、次の 2 つの機能を持つモデルをパッケージ化するとします。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['account_creation_date', 'num_lifetime_purchases'],
lookup_key='customer_id',
),
]
推論では、 account_creation_date
という名前の列を含む DataFrame で score_batch
を呼び出すことで、機能account_creation_date
のカスタム値を指定できます。この場合、API は Feature Store から num_lifetime_purchases
特徴のみを検索し、指定されたカスタム account_creation_date
列の値をモデルのスコアリングに使用します。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
model_uri='models:/ban_prediction_model/1',
df=batch_df
)
Feature Store の特徴と Feature Store の外部に存在するデータの組み合わせを使用して、モデルの学習とスコア付けを行います
Feature Storeの機能とFeature Store以外のデータを組み合わせてモデルをトレーニングできます。 モデルを特徴メタデータと一緒にパッケージ化すると、モデルは推論のために機能ストアから特徴値を取得します。
モデルをトレーニングするには、DataFrame FeatureEngineeringClient.create_training_set
(Unity Catalogでの特徴量エンジニアリングの場合) またはFeatureStoreClient.create_training_set
(ワークスペースFeature Store の場合) に渡される に列として追加データを含めます。この例では、 Feature Store から特徴量total_purchases_30d
と外部列browser
を使用します。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fe = FeatureEngineeringClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
feature_lookups = [
FeatureLookup(
table_name='recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id'] # 'browser' is not excluded
)
推論時には、 FeatureStoreClient.score_batch
で使用される DataFrame には browser
列が含まれている必要があります。
- Feature Engineering in Unity Catalog
- Workspace Feature Store
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
model_uri=model_uri,
df=batch_df
)
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri=model_uri,
df=batch_df
)
MLflow を使用してモデルを読み込み、バッチ推論を実行する
FeatureEngineeringClient
( Unity Catalogでの特徴量エンジニアリング) または FeatureStoreClient
(ワークスペース Feature Store の場合) のlog_model
方法を使用してモデルをログに記録した後、推論で MLflow を使用できます。 mlflow.pyfunc.predict
Feature Store から特徴量を取得し、推論時に指定された値も結合します。 モデルで使用されるフィーチャの主キーを指定する必要があります。
MLflow を使用したバッチ推論には、MLflow バージョン 2.11 以降が必要です。 time series 特徴量テーブルを使用するモデルはサポートされていません。time series 特徴量テーブルでバッチ推論を行うには、 score_batch
. 特徴量テーブルを使用したモデルの学習とバッチ推論の実行を参照してください。
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name='ml.recommender_system.customer_features',
feature_names=['total_purchases_30d'],
lookup_key='customer_id',
),
FeatureLookup(
table_name='ml.recommender_system.product_features',
feature_names=['category'],
lookup_key='product_id'
)
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fe.create_training_set(
df=df,
feature_lookups=feature_lookups,
label='rating',
exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model",
#refers to the default value of "result_type" if not provided at inference
params={"result_type":"double"},
)
# Batch inference with MLflow
# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.
# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)
# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})
# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)
欠落している特徴量の処理
存在しないルックアップ キーが予測のためにモデルに渡されると、 FeatureLookup
によってフェッチされる特徴値は NaN
です。