Feature Servingエンドポイントのデプロイとクエリ
この記事では、 Feature Serving エンドポイントをデプロイしてクエリを実行する方法を、手順を追って説明します。 この記事では、Databricks SDK を使用します。 一部の手順は、REST API または Databricks UI を使用して完了することもでき、それらのメソッドのドキュメントへの参照が含まれています。
この例では、都市とその場所 (緯度と経度) のテーブルと、それらの都市からのユーザーの現在の距離をアカウントに取り込むレコメンダー アプリがあります。 ユーザーの位置は常に変化するため、推論時にユーザーと各都市との間の距離を計算する必要があります。このチュートリアルでは、Databricks Online テーブルと Databricks Feature Serving を使用して、これらの計算を低レイテンシで実行する方法について説明します。サンプルコードの完全なセットについては、 サンプルノートブックを参照してください。
ステップ1.ソース テーブルを作成する
ソース テーブルには事前計算された特徴値が含まれており、主キーを持つ Unity Catalog 内の任意の Delta テーブルにすることができます。 この例では、テーブルには都市とその緯度と経度のリストが含まれています。 プライマリ・キーは destination_id
です。 サンプルデータを以下に示します。
name | destination_id(PK) | 緯度 | 経度 |
---|---|---|---|
テネシー州ナッシュビル | 0 | 36.162663 | -86.7816 |
ハワイ州ホノルル | 1 | 21.309885 | -157.85814 |
ネバダ州ラスベガス | 2 | 36.171562 | -115.1391 |
ニューヨーク州ニューヨーク | 3 | 40.712776 | -74.005974 |
ステップ2.オンライン テーブルの作成
オンライン テーブルは、オンライン アクセス用に最適化された Delta テーブルの読み取り専用コピーです。 詳細については、「リアルタイム テーブルFeature Servingにオンライン テーブルを使用する」を参照してください。
オンライン テーブルを作成するには、次の例のように、UI、 REST API 、または Databricks SDK を使用してオンライン テーブルを作成します 。
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow
workspace = WorkspaceClient()
# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"
spec = OnlineTableSpec(
primary_key_columns=["destination_id"],
source_table_full_name = feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
perform_full_copy=True)
# ignore "already exists" error
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
pprint(workspace.online_tables.get(online_table_name))
ステップ3.Unity Catalog で関数を作成する
この例では、関数は目的地 (位置が変わらない) とユーザー (位置が頻繁に変更され、推論の時点までわからない) との間の距離を計算します。
# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"
spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)
# Earth's radius in kilometers
radius = 6371
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c
return distance
$$""")
ステップ4.Unity Catalog で機能仕様を作成する
機能仕様では、エンドポイントが提供する機能とその検索キーを指定します。 また、取得した機能に適用する必要な関数とそのバインディングも指定します。 詳細については、「FeatureSpec
を作成する」を参照してください。
from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient
fe = FeatureEngineeringClient()
features=[
FeatureLookup(
table_name=feature_table_name,
lookup_key="destination_id"
),
FeatureFunction(
udf_name=function_name,
output_name="distance",
input_bindings={
"latitude": "latitude",
"longitude": "longitude",
"user_latitude": "user_latitude",
"user_longitude": "user_longitude"
},
),
]
feature_spec_name = f"main.on_demand_demo.travel_spec"
# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
ステップ5. Feature Serving エンドポイントを作成する
Feature Servingエンドポイントを作成するには、ここに示す UI の Create an endpoint、REST API、または Databricks SDKを使用できます。
Feature Serving エンドポイントは、ステップ 4 で作成したfeature_spec
をパラメーターとして受け取ります。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
# Create endpoint
endpoint_name = "fse-location"
try:
status = workspace.serving_endpoints.create_and_wait(
name=endpoint_name,
config = EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name,
scale_to_zero_enabled=True,
workload_size="Small"
)
]
)
)
print(status)
# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)
ステップ6. Feature Serving エンドポイントのクエリ
エンドポイントをクエリするときは、プライマリキーと、必要に応じて関数が使用するコンテキストデータを指定します。この例では、関数はユーザーの現在地 (緯度と経度) を入力として受け取ります。ユーザーの位置は常に変化しているため、推論時にコンテキスト特徴として関数に提供する必要があります。
また、UI を使用してエンドポイントをクエリし、UI またはREST API を使用してエンドポイントをクエリ することもできます。
わかりやすくするために、この例では 2 つの都市までの距離のみを計算します。より現実的なシナリオでは、特徴量テーブルの各位置からのユーザーの距離を計算して、推奨する都市を決定する場合があります。
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
endpoint=endpoint_name,
inputs={
"dataframe_records": [
{"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
{"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
]
},
)
pprint(response)
ノートブックの例
手順の完全な図については、次のノートブックを参照してください。
Feature Serving オンラインテーブルを含むノートブックの例
追加情報
特徴エンジニアリング Python API の使用の詳細については、 リファレンス ドキュメントを参照してください。