メインコンテンツまでスキップ

特徴量ビュー

備考

プレビュー

この機能は パブリック プレビュー段階です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

Feature Views を使用すると、データソースから機能を定義およびコンピュートできます。機能は、さまざまなソース (Deltaテーブル、Kafkaストリーム、およびリクエスト時データ) と計算 (時間ウィンドウ集計、シンプルな列選択など) を使用して定義できます。このガイドでは、次のワークフローについて説明します:

  • 特徴量開発 ワークフロー

    • create_featureを使用して、モデルのトレーニングとサービング ワークフローで使用できる Unity Catalog 特徴量オブジェクトを定義します。
    • あるいは、 Featureオブジェクトをローカルで構築し、後でregister_featureを使用してそれらをUnity Catalogに永続化することもできます。 ローカルで構築された特徴量は、登録前にcreate_training_setと組み合わせて使用できます。
  • モデルトレーニング ワークフロー

  • 特徴量のマテリアライズとサービング のワークフロー

    • create_featureで特徴量を定義するか、 get_featureを使用して特徴量を取得した後、 materialize_featuresを使用して、その特徴量または特徴量のセットをオフライン ストアにマテリアライズして効率的に再利用したり、オンライン ストアにマテリアライズしてオンラインで提供したりできます。
    • マテリアライズドビューでcreate_training_set使用して、オフライン バッチ トレーニング データセットを準備します。

API の詳細については、特徴量ビュー API リファレンスを参照してください。

要件

  • サーバレスコンピュート、または Databricks Runtime 17.0 ML以降を実行しているクラシックコンピュートクラスター。

  • カスタムPythonパッケージをインストールする必要があります。ノートブックを実行するたびに、次のコード行を実行します。

    Python
    %pip install databricks-feature-engineering>=0.16.0
    dbutils.library.restartPython()

クイックスタートの例

実行可能なクイックスタートノートブックについては、 「サンプルノートブック」を参照してください。

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta

CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"

# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)

# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)

sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)

fe = FeatureEngineeringClient()

# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()

# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()

# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)

# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)

# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()

# training code

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)

# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)

# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
),
)

# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)

サンプルノートブック

Feature Views クイックスタート ノートブック

ノートブックを新しいタブで開く

ストリーミング特徴量

Delta テーブルからのバッチ特徴量に加えて、リアルタイムユースケース向けにストリーミングソースから特徴量を定義できます。ストリーミング特徴量は、バッチ特徴量と同じ Feature クラスを使用します。つまり、同じFeatureコンストラクタ、同じ集計関数、同じトレーニングおよびサービングのワークフローを使用するため、バッチからリアルタイムへのアップグレードに必要なコード変更は最小限で済みます。マテリアライズされると、ストリーミング特徴量は、1秒未満のエンドツーエンドの鮮度 (p99 レイテンシー 200ms) をモデルサービングエンドポイントに直接配信します。

ストリーミング機能を使用するには、まずストリームを設定し、次にStreamSourceを使用してそれを参照します。ストリームソースは入力をKafkaとしてサポートし、トレーニング用のデータの履歴コピーとしてインジェスト (Delta) テーブルを自動的に維持します。

ストリーミング特徴量を定義する

StreamSourceは、3部構成の名前(catalog.schema.stream_name)でストリームを参照します。ストリームはUnity Catalogのセキュリティ保護可能なオブジェクトではありませんが、Unity Catalogスキーマにスコープされ、アクセスはストリームの取り込みテーブルによって管理されます。エンティティ、時系列、および関数定義内のカラム参照は、Kafkaメッセージのどの部分を読み取るかを示すためにvalue.またはkey.でプレフィックスされる必要があります。ネストされたフィールドは、ドット表記を使用してサポートされています(例:value.user.address.city)。

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
StreamSource,
Feature,
AggregationFunction,
Sum,
RollingWindow,
)
from datetime import timedelta

client = FeatureEngineeringClient()

stream_source = StreamSource(
full_name="my_catalog.my_schema.my_stream",
)

feature = Feature(
name="user_purchase_sum",
source=stream_source,
entity=["value.user_id"],
timeseries_column="value.event_time",
function=AggregationFunction(
operator=Sum(input="value.amount"),
time_window=RollingWindow(window_duration=timedelta(hours=1)),
),
)

StreamSource上のフィルター条件

集約の前にストリームから行をフィルターするには、DeltaTableSource と同様に filter_condition を使用します。

Python
stream_source = StreamSource(
full_name="my_catalog.my_schema.my_stream",
filter_condition="value.event_type = 'purchase'",
)

ストリームからの列の選択

ColumnSelection」の機能は、ストリーミングソースと連携します。選択した列は、時点の精度を維持しながら、各エンティティについてストリームから取得した最新の値を表します。

Python
from databricks.feature_engineering.entities import ColumnSelection

passenger_count = Feature(
name="passenger_count",
source=stream_source,
entity=["value.user_id"],
timeseries_column="value.event_time",
function=ColumnSelection(column="value.passenger_count"),
)

ネストされたフィールドにアクセスする

ネストされたJSONフィールドは、ドット表記を使用してアクセスできます(例:value.nested_field.amount)。サービング時、リクエストペイロードとレスポンスはリーフノード名を使用します(例:value.amountの代わりにamount)。モデルまたはFeature Spec内のすべてのエンティティ、時系列、および特徴量出力カラム全体で、リーフノード名は一意である必要があります。なぜなら、サービングエンドポイントは値をルーティングするためにリーフ名を使用するからです。

ストリーミング特徴量の時間枠

ストリーミング特徴量は、集計に対して RollingWindow のみをサポートします。ローリングウィンドウは最新のデータに対して継続的に再計算され、これはストリーミングソースのリアルタイムの性質と一致します。TumblingWindowSlidingWindow は、固定された履歴間隔でのバッチ計算向けに設計されています。

ストリーミング特徴量ノートブックの例

ストリーミングFeature Viewsクイックスタートノートブック

ノートブックを新しいタブで開く

モデルのトレーニングと推論

log_model()score_batch()、およびcreate_training_set()などの特徴量ビューを使用してモデルをトレーニングし、バッチ推論を実行するには、特徴量ビューを使用したモデルのトレーニングを参照してください。

特徴量のマテリアライズ

特徴量を定義したら、オフラインストアまたはオンラインストアにマテリアライズして、トレーニングおよびサービングのワークフローで効率的に再利用できます。特徴量をマテリアライズした後、CPUモデルサービングを使用してモデルを提供できます。詳細については、「特徴量ビューのマテリアライズ」を参照してください。

ベストプラクティス

特徴量の命名

  • ビジネスに不可欠な特徴量にはわかりやすい名前を使用します。
  • チーム全体で一貫した命名規則に従ってください。
  • 特徴量の開発を始める際は、自動生成された名前を使用してください。

時間枠

  • ウィンドウの境界をビジネス サイクル (日次、週次) に合わせます。
  • 短い期間で最新のトレンドを捉えることはできますが、ノイズが多くなる場合があります。より長い期間を対象とすると、特徴分布はより安定するが、最近の行動変化を見逃す可能性がある。ユースケースに応じて、基となる信号が変化する速度に基づいて選択してください。例えば、7日間のウィンドウは日々の変動を平滑化し、一貫性のあるモデル入力を生成する一方、1時間のウィンドウは行動の変化に迅速に対応できるものの、モデルのパフォーマンスを低下させるようなばらつきを生じさせる可能性がある。分布が変化するとモデルの精度が低下する場合は、入力値を安定させるために、より長いウィンドウを使用してください。
  • 回転式窓やスライド式窓は、巻き上げ式(連続式)窓よりも拡張性に優れている。ほとんどの用途では、まずスライド式窓から始めてみましょう。

パフォーマンス

  • データスキャンを最小限に抑えるため、同じデータソースからの特徴量を単一のmaterialize_features呼び出しでマテリアライズします。
  • 同じデータソース上のフィーチャーには、同じ粒度(例えば、1時間単位または1日単位のスライド期間)を使用することで、データ化時のグループ化をより適切に行うことができます。

エンティティ列とフィルター条件

同じソース テーブルの機能を操作する場合は、次の決定ガイドを使用します。

異なる集計レベルが必要な場合は、 entity ( create_feature上) を使用してください。

  • 顧客レベルの特徴量 (顧客ごとに 1 行): entity=["customer_id"]
  • 顧客と販売者の特徴量 (顧客ごとに複数の行): entity=["customer_id", "merchant_id"]
  • 異なる集計レベルで同じDeltaTableSourceを共有できます 。各特徴量定義で異なるentity値を指定してください。

同じ集計レベルで行をフィルタリングする必要がある場合は、 filter_condition (on DeltaTableSource ) を使用してください。

  • 高額取引のみ : filter_condition="amount > 100" (顧客ごとに集計されます)
  • 完了した注文のみ : filter_condition="status = 'completed'" (顧客ごとに集計されます)

経験則として、 変更によってエンティティ値ごとの行数が変わる場合は、特徴量定義で異なるentity値を使用してください。同じ集計に寄与する行をフィルタリングするだけであれば、ソースでfilter_conditionを使用してください。

一般的なパターン

顧客分析

Python
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow

fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),

# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),

# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]

トレンド分析

Python
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)

季節パターン

Python
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)

制限事項

  • create_training_set APIで使用する場合、トレーニング(ラベル付き)データセットと特徴量定義の間で、エンティティと時系列列の名前が一致している必要があります。
  • トレーニング データセットのlabel列として使用される列名は、 Feature定義に使用されるソース テーブルに存在してはなりません。
  • create_feature APIでは、限られた関数 ( UDAF ) がサポートされています。 サポートされている関数を参照してください。
  • エンティティ列は、型DATEまたはTIMESTAMPにすることはできません。
  • RequestSource ScalarDataType ( INTEGERFLOATBOOLEANSTRINGDOUBLELONGTIMESTAMPDATESHORT ) で定義されたスカラーデータ型のみをサポートします。配列、マップ、構造体などの複雑な型はサポートされていません。
  • RequestSource 集計関数や時間枠はサポートしていません。使用できる関数はColumnSelectionのみです。
  • トレーニングセットまたはサービス提供エンドポイント内のすべてのソースにおいて、エンティティ列名、時系列列名、およびリクエスト特徴量列名のセットは、グローバルに一意である必要があります。

マテリアライズに関する制限事項については、 「制限事項」を参照してください。