コンピュート Python ユーザー定義関数 を使用したオンデマンド機能

この記事では、Databricks でオンデマンド機能を作成して使用する方法について説明します。

リアルタイム アプリケーションの機械学習モデルでは、多くの場合、最新の特徴値が必要です。 図に示されている例では、レストラン レコメンデーション モデルの 1 つの特徴は、レストランからのユーザーの現在の距離です。 この特徴量は、「オンデマンド」、つまりスコアリング要求時に計算する必要があります。 スコアリング要求を受信すると、モデルはレストランの場所を検索し、事前定義された関数を適用して、ユーザーの現在地とレストランの間の距離を計算します。 その距離は、 Feature Storeから事前計算された他の特徴量とともに、モデルへの入力として渡されます。

コンピュート機能オンデマンドワークフロー

オンデマンド機能を使用するには、ワークスペースで Unity Catalog を有効にし、 Databricks Runtime 13.3 LTS 機械学習以降を使用する必要があります。

オンデマンド機能とは何ですか?

「オンデマンド」とは、値が事前にはわからないが、推論時に計算される特徴量を指します。 Databricks では、 Python ユーザー定義関数 (UDF) を使用して、オンデマンドの特徴を計算する方法を指定します。 これらの関数は Unity Catalog によって管理され、 カタログ エクスプローラーで検出できます。

ワークフロー

オンデマンドでフィーチャをコンピュートするには、フィーチャ値の計算方法を記述する Python ユーザー定義関数 (UDF) を指定します。

  • トレーニング中に、この関数とその入力バインドを create_training_set API の feature_lookups パラメーターで指定します。

  • トレーニング済みのモデルは、Feature Store メソッド log_modelを使用してログに記録する必要があります。 これにより、モデルが推論に使用されるときにオンデマンド機能が自動的に評価されます。

  • バッチ スコアリングの場合、 score_batch API は、オンデマンド機能を含むすべての機能値を自動的に計算して返します。

  • Databricks モデルサービングを使用してモデルを提供すると、モデルは自動的に Python UDF を使用して、スコアリング要求ごとにオンデマンド機能をコンピュートします。

Python UDF を作成する

Python UDF は、ノートブックまたは Databricks SQL で作成できます。

たとえば、ノートブック セルで次のコードを実行すると、カタログ main とスキーマ defaultに Python UDF example_feature が作成されます。

%sql
CREATE FUNCTION main.default.example_feature(x INT, y INT)
RETURNS INT
LANGUAGE PYTHON
COMMENT 'add two numbers'
AS $$
def add_numbers(n1: int, n2: int) -> int:
  return n1 + n2

return add_numbers(x, y)
$$

コードの実行後、「 カタログエクスプローラ」(Catalog Explorer ) で 3 レベルのネームスペース内を移動して、関数定義を表示できます。

カタログエクスプローラの機能

Python UDF の作成の詳細については、「 Python UDF を Unity Catalog に登録する 」および 「SQL 言語マニュアル」を参照してください。

欠損している特徴量の値を処理する方法

Python UDF が FeatureLookup の結果に依存する場合、要求されたルックアップ キーが見つからない場合に返される値は環境によって異なります。 score_batchを使用する場合、返される値は Noneです。オンライン配信を使用する場合、返される値は float("nan")です。

次のコードは、両方のケースを処理する方法の例です。

%sql
CREATE OR REPLACE FUNCTION square(x INT)
RETURNS INT
LANGUAGE PYTHON AS
$$
import numpy as np
if x is None or np.isnan(x):
  return 0
return x * x
$$

トレーニングする オンデマンド機能 を使用したモデル

モデルをトレーニングするには、 feature_lookups パラメーターで create_training_set API に渡される FeatureFunctionを使用します。

次のコード例では、前のセクションで定義した Python UDF main.default.example_feature を使用しています。

# Install databricks-feature-engineering first with:
# %pip install databricks-feature-engineering
# dbutils.library.restartPython()

from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering import FeatureFunction, FeatureLookup
from sklearn import linear_model

fe = FeatureEngineeringClient()

features = [
  # The feature 'on_demand_feature' is computed as the sum of the the input value 'new_source_input'
  # and the pre-materialized feature 'materialized_feature_value'.
  # - 'new_source_input' must be included in base_df and also provided at inference time.
  #   - For batch inference, it must be included in the DataFrame passed to 'FeatureEngineeringClient.score_batch'.
  #   - For real-time inference, it must be included in the request.
  # - 'materialized_feature_value' is looked up from a feature table.

  FeatureFunction(
      udf_name="main.default.example_feature",    # UDF must be in Unity Catalog so uses a three-level namespace
      input_bindings={
        "x": "new_source_input",
        "y": "materialized_feature_value"
      },
      output_name="on_demand_feature",
  ),
  # retrieve the prematerialized feature
  FeatureLookup(
    table_name = 'main.default.table',
    feature_names = ['materialized_feature_value'],
    lookup_key = 'id'
  )
]

# base_df includes the columns 'id', 'new_source_input', and 'label'
training_set = fe.create_training_set(
  df=base_df,
  feature_lookups=features,
  label='label',
  exclude_columns=['id', 'new_source_input', 'materialized_feature_value']     # drop the columns not used for training
)

# The training set contains the columns 'on_demand_feature' and 'label'.
training_df = training_set.load_df().toPandas()

# training_df columns ['materialized_feature_value', 'label']
X_train = training_df.drop(['label'], axis=1)
y_train = training_df.label

model = linear_model.LinearRegression().fit(X_train, y_train)

モデルをログに記録し、 Unity Catalog に登録する

フィーチャ メタデータと共にパッケージ化されたモデルは 、 Unity Catalogに登録できます。モデルの作成に使用するフィーチャー テーブルは、 Unity Catalogに格納する必要があります。

モデルが推論に使用されるときにオンデマンド機能を自動的に評価するようにするには、次のようにレジストリ URI を設定し、モデルをログに記録する必要があります。

import mlflow
mlflow.set_registry_uri("databricks-uc")

fe.log_model(
    model=model,
    artifact_path="main.default.model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="main.default.recommender_model"
)

オンデマンド機能を定義する Python UDF が Python パッケージをインポートする場合は、引数 extra_pip_requirementsを使用してこれらのパッケージを指定する必要があります。 例えば:

import mlflow
mlflow.set_registry_uri("databricks-uc")

fe.log_model(
    model=model,
    artifact_path="model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="main.default.recommender_model",
    extra_pip_requirements=["scikit-learn==1.20.3"]
)

制約

オンデマンド機能は、MapType と ArrayType を除く Feature Store でサポートされているすべてのデータ型 を出力できます。

ノートブックの例: オンデマンド機能

次のノートブックは、オンデマンド機能を使用するモデルをトレーニングしてスコア付けする方法の例を示しています。

基本的なオンデマンド機能のデモノートブック

ノートブックを新しいタブで開く

次のノートブックは、レストラン推奨モデルの例を示しています。 レストランの場所は、Databricks のオンライン テーブルから検索されます。 ユーザーの現在地は、スコアリング要求の一部として送信されます。 このモデルはオンデマンド機能を使用して、ユーザーからレストランまでのリアルタイム距離をコンピュートします。 その距離は、モデルへの入力として使用されます。

オンライン テーブル デモ ノートブックを使用したレストラン推奨オンデマンド機能

ノートブックを新しいタブで開く

次の図は、サードパーティのオンラインストアを使用した同じレストラン推奨モデルを示しています。 レストランの場所は、DynamoDB に公開された事前にマテリアライズされた特徴量テーブルから検索されます。

DynamoDB デモノートブックを使用したレストラン推奨オンデマンド機能

ノートブックを新しいタブで開く