Feature Servingエンドポイントのデプロイとクエリ
この記事ではFeature Servingエンドポイントをデプロイしてクエリを実行する方法を段階的に説明します。 この記事では Databricks SDK を使用します。 一部のステップは、 REST APIまたはDatabricks UI を使用して完了することもでき、それらのメソッドのドキュメントへの参照が含まれます。
この例では、都市の位置 (緯度と経度) が記載されたテーブルと、それらの都市からのユーザーの現在の距離を考慮するレコメンデーション アプリがあります。 ユーザーの位置は常に変化するため、推定時にユーザーと各都市の間の距離を計算する必要があります。 このチュートリアルでは、Databricks Online Tables と Databricks Feature Serving を使用して、低レイテンシでこれらの計算を実行する方法を説明します。 完全なサンプルコードについては、サンプルノートブックを参照してください。
ステップ1. ソーステーブルを作成する
ソース テーブルには、事前計算された特徴値が含まれており、主キーを持つDelta Unity Catalog内の任意の テーブルにすることができます。この例では、テーブルには都市とその緯度と経度のリストが含まれています。 主キーは destination_id
です。 サンプルデータを以下に示します。
名前 |
destination_id (pk) |
緯度 |
経度 |
---|---|---|---|
テネシー州ナッシュビル |
0 |
36.162663 |
-86.7816 |
ハワイ州ホノルル |
1 |
21.309885 |
-157.85814 |
ネバダ州ラスベガス |
2 |
36.171562 |
-115.1391 |
ニューヨーク州ニューヨーク |
3 |
40.712776 |
-74.005974 |
ステップ2. オンラインテーブルを作成する
オンライン テーブルは、オンライン アクセス用に最適化されたDeltaテーブルの読み取り専用コピーです。 詳細については、 「リアルタイムFeature Servingにオンライン テーブルを使用する」を参照してください。
オンライン テーブルを作成するには、次の例のように、UI、 REST API 、または Databricks SDK を使用してオンライン テーブルを作成します 。
from pprint import pprint
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import mlflow
workspace = WorkspaceClient()
# Create an online table
feature_table_name = f"main.on_demand_demo.location_features"
online_table_name=f"main.on_demand_demo.location_features_online"
spec = OnlineTableSpec(
primary_key_columns=["destination_id"],
source_table_full_name = feature_table_name,
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({'triggered': 'true'}),
perform_full_copy=True)
# ignore "already exists" error
try:
online_table_pipeline = workspace.online_tables.create(name=online_table_name, spec=spec)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
pprint(workspace.online_tables.get(online_table_name))
ステップ3. Unity Catalogで関数を作成する
この例では、関数は目的地 (場所が変わらない) とユーザー (場所が頻繁に変化し、推論の時点までわからない) の間の距離を計算します。
# Define the function. This function calculates the distance between two locations.
function_name = f"main.on_demand_demo.distance"
spark.sql(f"""
CREATE OR REPLACE FUNCTION {function_name}(latitude DOUBLE, longitude DOUBLE, user_latitude DOUBLE, user_longitude DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON AS
$$
import math
lat1 = math.radians(latitude)
lon1 = math.radians(longitude)
lat2 = math.radians(user_latitude)
lon2 = math.radians(user_longitude)
# Earth's radius in kilometers
radius = 6371
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
distance = radius * c
return distance
$$""")
ステップ4. Unity Catalogで機能仕様を作成する
機能仕様は、エンドポイントがサービスを提供する機能とそのルックアップキーを指定します。 また、取得した地物とそのバインディングに適用するために必要な関数も指定します。 詳細については、「 FeatureSpec の作成」を参照してください。
from databricks.feature_engineering import FeatureLookup, FeatureFunction, FeatureEngineeringClient
fe = FeatureEngineeringClient()
features=[
FeatureLookup(
table_name=feature_table_name,
lookup_key="destination_id"
),
FeatureFunction(
udf_name=function_name,
output_name="distance",
input_bindings={
"latitude": "latitude",
"longitude": "longitude",
"user_latitude": "user_latitude",
"user_longitude": "user_longitude"
},
),
]
feature_spec_name = f"main.on_demand_demo.travel_spec"
# The following code ignores errors raised if a feature_spec with the specified name already exists.
try:
fe.create_feature_spec(name=feature_spec_name, features=features, exclude_columns=None)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
ステップ5. Feature Servingエンドポイントを作成する
Feature Servingエンドポイントを作成するには、ここに示す UI の「エンドポイントの作成」 、 REST API 、またはDatabricks SDK使用できます。
Feature Servingエンドポイントは、ステップ 4 で作成した feature_spec
をコンテナーとして受け取ります。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedEntityInput
# Create endpoint
endpoint_name = "fse-location"
try:
status = workspace.serving_endpoints.create_and_wait(
name=endpoint_name,
config = EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=feature_spec_name,
scale_to_zero_enabled=True,
workload_size="Small"
)
]
)
)
print(status)
# Get the status of the endpoint
status = workspace.serving_endpoints.get(name=endpoint_name)
print(status)
ステップ6. Feature Servingエンドポイントをクエリする
エンドポイントをクエリするときは、主キーと、必要に応じて関数が使用するコンテキストデータを指定します。 この例では、関数はユーザーの現在地 (緯度と経度) を入力として受け取ります。 ユーザーの場所は常に変化しているため、推論時にコンテキスト特徴として関数に提供する必要があります。
UI を使用してエンドポイントをクエリすることもできます。UI または REST API を使用してエンドポイントをクエリします 。
わかりやすくするために、この例では 2 つの都市までの距離のみを計算します。 より現実的なシナリオでは、特徴量テーブル内の各場所からのユーザーの距離を計算して、推奨する都市を決定することが考えられます。
import mlflow.deployments
client = mlflow.deployments.get_deploy_client("databricks")
response = client.predict(
endpoint=endpoint_name,
inputs={
"dataframe_records": [
{"destination_id": 1, "user_latitude": 37, "user_longitude": -122},
{"destination_id": 2, "user_latitude": 37, "user_longitude": -122},
]
},
)
pprint(response)
追加情報
特徴エンジニアリング Python API の使用の詳細については、リファレンス ドキュメントを参照してください。