特徴量ビュー
プレビュー
この機能は パブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
Feature Views を使用すると、データソースから機能を定義およびコンピュートできます。機能は、さまざまなソース (Deltaテーブル、Kafkaストリーム、およびリクエスト時データ) と計算 (時間ウィンドウ集計、シンプルな列選択など) を使用して定義できます。このガイドでは、次のワークフローについて説明します:
-
特徴量開発 ワークフロー
create_featureを使用して、モデルのトレーニングとサービング ワークフローで使用できる Unity Catalog 特徴量オブジェクトを定義します。- あるいは、
Featureオブジェクトをローカルで構築し、後でregister_featureを使用してそれらをUnity Catalogに永続化することもできます。 ローカルで構築された特徴量は、登録前にcreate_training_setと組み合わせて使用できます。
-
モデルトレーニング ワークフロー
- 機械学習における特定時点の集約特徴量を計算するには、
create_training_setを使用します。特徴量ビューを使用したトレーニングに関する詳細なドキュメントについては、特徴量ビューを使用してモデルをトレーニングするを参照してください。
- 機械学習における特定時点の集約特徴量を計算するには、
-
特徴量のマテリアライズとサービング のワークフロー
create_featureで特徴量を定義するか、get_featureを使用して特徴量を取得した後、materialize_featuresを使用して、その特徴量または特徴量のセットをオフライン ストアにマテリアライズして効率的に再利用したり、オンライン ストアにマテリアライズしてオンラインで提供したりできます。- マテリアライズドビューで
create_training_set使用して、オフライン バッチ トレーニング データセットを準備します。
API の詳細については、特徴量ビュー API リファレンスを参照してください。
要件
-
サーバレスコンピュート、または Databricks Runtime 17.0 ML以降を実行しているクラシックコンピュートクラスター。
-
カスタムPythonパッケージをインストールする必要があります。ノートブックを実行するたびに、次のコード行を実行します。
Python%pip install databricks-feature-engineering>=0.16.0
dbutils.library.restartPython()
クイックスタートの例
実行可能なクイックスタートノートブックについては、 「サンプルノートブック」を参照してください。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)
# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)
sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)
fe = FeatureEngineeringClient()
# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()
# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()
# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)
# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)
# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
),
)
# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)
サンプルノートブック
Feature Views クイックスタート ノートブック
ストリーミング特徴量
Delta テーブルからのバッチ特徴量に加えて、リアルタイムユースケース向けにストリーミングソースから特徴量を定義できます。ストリーミング特徴量は、バッチ特徴量と同じ Feature クラスを使用します。つまり、同じFeatureコンストラクタ、同じ集計関数、同じトレーニングおよびサービングのワークフローを使用するため、バッチからリアルタイムへのアップグレードに必要なコード変更は最小限で済みます。マテリアライズされると、ストリーミング特徴量は、1秒未満のエンドツーエンドの鮮度 (p99 レイテンシー 200ms) をモデルサービングエンドポイントに直接配信します。
ストリーミング機能を使用するには、まずストリームを設定し、次にStreamSourceを使用してそれを参照します。ストリームソースは入力をKafkaとしてサポートし、トレーニング用のデータの履歴コピーとしてインジェスト (Delta) テーブルを自動的に維持します。
ストリーミング特徴量を定義する
StreamSourceは、3部構成の名前(catalog.schema.stream_name)でストリームを参照します。ストリームはUnity Catalogのセキュリティ保護可能なオブジェクトではありませんが、Unity Catalogスキーマにスコープされ、アクセスはストリームの取り込みテーブルによって管理されます。エンティティ、時系列、および関数定義内のカラム参照は、Kafkaメッセージのどの部分を読み取るかを示すためにvalue.またはkey.でプレフィックスされる必要があります。ネストされたフィールドは、ドット表記を使用してサポートされています(例:value.user.address.city)。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
StreamSource,
Feature,
AggregationFunction,
Sum,
RollingWindow,
)
from datetime import timedelta
client = FeatureEngineeringClient()
stream_source = StreamSource(
full_name="my_catalog.my_schema.my_stream",
)
feature = Feature(
name="user_purchase_sum",
source=stream_source,
entity=["value.user_id"],
timeseries_column="value.event_time",
function=AggregationFunction(
operator=Sum(input="value.amount"),
time_window=RollingWindow(window_duration=timedelta(hours=1)),
),
)
StreamSource上のフィルター条件
集約の前にストリームから行をフィルターするには、DeltaTableSource と同様に filter_condition を使用します。
stream_source = StreamSource(
full_name="my_catalog.my_schema.my_stream",
filter_condition="value.event_type = 'purchase'",
)
ストリームからの列の選択
「ColumnSelection」の機能は、ストリーミングソースと連携します。選択した列は、時点の精度を維持しながら、各エンティティについてストリームから取得した最新の値を表します。
from databricks.feature_engineering.entities import ColumnSelection
passenger_count = Feature(
name="passenger_count",
source=stream_source,
entity=["value.user_id"],
timeseries_column="value.event_time",
function=ColumnSelection(column="value.passenger_count"),
)
ネストされたフィールドにアクセスする
ネストされたJSONフィールドは、ドット表記を使用してアクセスできます(例:value.nested_field.amount)。サービング時、リクエストペイロードとレスポンスはリーフノード名を使用します(例:value.amountの代わりにamount)。モデルまたはFeature Spec内のすべてのエンティティ、時系列、および特徴量出力カラム全体で、リーフノード名は一意である必要があります。なぜなら、サービングエンドポイントは値をルーティングするためにリーフ名を使用するからです。
ストリーミング特徴量の時間枠
ストリーミング特徴量は、集計に対して RollingWindow のみをサポートします。ローリングウィンドウは最新のデータに対して継続的に再計算され、これはストリーミングソースのリアルタイムの性質と一致します。TumblingWindow と SlidingWindow は、固定された履歴間隔でのバッチ計算向けに設計されています。
ストリーミング特徴量ノートブックの例
ストリーミングFeature Viewsクイックスタートノートブック
モデルのトレーニングと推論
log_model()、score_batch()、およびcreate_training_set()などの特徴量ビューを使用してモデルをトレーニングし、バッチ推論を実行するには、特徴量ビューを使用したモデルのトレーニングを参照してください。
特徴量のマテリアライズ
特徴量を定義したら、オフラインストアまたはオンラインストアにマテリアライズして、トレーニングおよびサービングのワークフローで効率的に再利用できます。特徴量をマテリアライズした後、CPUモデルサービングを使用してモデルを提供できます。詳細については、「特徴量ビューのマテリアライズ」を参照してください。
ベストプラクティス
特徴量の命名
- ビジネスに不可欠な特徴量にはわかりやすい名前を使用します。
- チーム全体で一貫した命名規則に従ってください。
- 特徴量の開発を始める際は、自動生成された名前を使用してください。
時間枠
- ウィンドウの境界をビジネス サイクル (日次、週次) に合わせます。
- 短い期間で最新のトレンドを捉えることはできますが、ノイズが多くなる場合があります。より長い期間を対象とすると、特徴分布はより安定するが、最近の行動変化を見逃す可能性がある。ユースケースに応じて、基となる信号が変化する速度に基づいて選択してください。例えば、7日間のウィンドウは日々の変動を平滑化し、一貫性のあるモデル入力を生成する一方、1時間のウィンドウは行動の変化に迅速に対応できるものの、モデルのパフォーマンスを低下させるようなばらつきを生じさせる可能性がある。分布が変化するとモデルの精度が低下する場合は、入力値を安定させるために、より長いウィンドウを使用してください。
- 回転式窓やスライド式窓は、巻き上げ式(連続式)窓よりも拡張性に優れている。ほとんどの用途では、まずスライド式窓から始めてみましょう。
パフォーマンス
- データスキャンを最小限に抑えるため、同じデータソースからの特徴量を単一の
materialize_features呼び出しでマテリアライズします。 - 同じデータソース上のフィーチャーには、同じ粒度(例えば、1時間単位または1日単位のスライド期間)を使用することで、データ化時のグループ化をより適切に行うことができます。
エンティティ列とフィルター条件
同じソース テーブルの機能を操作する場合は、次の決定ガイドを使用します。
異なる集計レベルが必要な場合は、 entity ( create_feature上) を使用してください。
- 顧客レベルの特徴量 (顧客ごとに 1 行):
entity=["customer_id"] - 顧客と販売者の特徴量 (顧客ごとに複数の行):
entity=["customer_id", "merchant_id"] - 異なる集計レベルで同じ
DeltaTableSourceを共有できます 。各特徴量定義で異なるentity値を指定してください。
同じ集計レベルで行をフィルタリングする必要がある場合は、 filter_condition (on DeltaTableSource ) を使用してください。
- 高額取引のみ :
filter_condition="amount > 100"(顧客ごとに集計されます) - 完了した注文のみ :
filter_condition="status = 'completed'"(顧客ごとに集計されます)
経験則として、 変更によってエンティティ値ごとの行数が変わる場合は、特徴量定義で異なるentity値を使用してください。同じ集計に寄与する行をフィルタリングするだけであれば、ソースでfilter_conditionを使用してください。
一般的なパターン
顧客分析
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]
トレンド分析
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)
季節パターン
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)
制限事項
create_training_setAPIで使用する場合、トレーニング(ラベル付き)データセットと特徴量定義の間で、エンティティと時系列列の名前が一致している必要があります。- トレーニング データセットの
label列として使用される列名は、Feature定義に使用されるソース テーブルに存在してはなりません。 create_featureAPIでは、限られた関数 ( UDAF ) がサポートされています。 サポートされている関数を参照してください。- エンティティ列は、型
DATEまたはTIMESTAMPにすることはできません。 RequestSourceScalarDataType(INTEGER、FLOAT、BOOLEAN、STRING、DOUBLE、LONG、TIMESTAMP、DATE、SHORT) で定義されたスカラーデータ型のみをサポートします。配列、マップ、構造体などの複雑な型はサポートされていません。RequestSource集計関数や時間枠はサポートしていません。使用できる関数はColumnSelectionのみです。- トレーニングセットまたはサービス提供エンドポイント内のすべてのソースにおいて、エンティティ列名、時系列列名、およびリクエスト特徴量列名のセットは、グローバルに一意である必要があります。
マテリアライズに関する制限事項については、 「制限事項」を参照してください。