メインコンテンツまでスキップ

宣言的な特徴

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

宣言型フィーチャー エンジニアリングAPIs使用すると、データ ソースからフィーチャーを定義およびコンピュートできます。 機能は、さまざまな ( Deltaテーブルやリクエスト時間データ) および計算 (時間ウィンドウ集計、単純な列選択など) を使用して定義できます。 このガイドでは、以下のワークフローについて説明します。

  • 機能開発 ワークフロー

    • create_featureを使用して、モデルのトレーニングとサービング ワークフローで使用できる Unity Catalog 機能オブジェクトを定義します。
    • あるいは、 Featureオブジェクトをローカルで構築し、後でregister_featureを使用してそれらをUnity Catalogに永続化することもできます。 ローカルで構築された特徴は、登録前にcreate_training_setと組み合わせて使用できます。
  • モデルトレーニング ワークフロー

  • 機能の実現と提供の ワークフロー

    • create_featureで機能を定義するか、 get_featureを使用して機能を取得した後、 materialize_featuresを使用して、その機能または機能のセットをオフライン ストアに具体化して効率的に再利用したり、オンライン ストアに具体化してオンラインで提供したりできます。
    • マテリアライズドビューでcreate_training_set使用して、オフライン バッチ トレーニング データセットを準備します。

APIの詳細については、 「宣言型機能APIリファレンス」を参照してください。

要件

  • Databricks Runtime 17.0 ML以降を実行するクラシック コンピュートクラスター。

  • カスタムPythonパッケージをインストールする必要があります。ノートブックを実行するたびに、次のコード行を実行します。

    Python
    %pip install databricks-feature-engineering>=0.15.0
    dbutils.library.restartPython()

クイックスタートの例

実行可能なクイックスタートノートブックについては、 「サンプルノートブック」を参照してください。

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
MaterializedFeaturePipelineScheduleState,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta

CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"

# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)

# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)

sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)

fe = FeatureEngineeringClient()

# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()

# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()

# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)

# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)

# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()

# training code

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)

# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)

# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)

# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)

サンプルノートブック

宣言型機能クイックスタートノートブック

ノートブックを新しいタブで開く

モデルのトレーニングと推論

log_model()score_batch() 、およびcreate_training_set()などの宣言型機能を使用してモデルをトレーニングし、実行バッチ推論を実行するには、「宣言型機能を使用したモデルのトレーニング」を参照してください。

機能の具体化

機能を定義したら、それらをオフラインまたはオンラインストアに具体化することで、トレーニングやサービス提供のワークフローにおいて効率的に再利用できます。フィーチャーを具体化した後、CPU モデルサービングを使用してモデルを提供できます。 詳細については、 「Materialize の宣言型機能」を参照してください。

ベストプラクティス

機能の命名

  • ビジネスに不可欠な機能にはわかりやすい名前を使用します。
  • チーム全体で一貫した命名規則に従ってください。
  • 機能開発を始める際は、自動生成された名前を使用してください。

時間枠

  • ウィンドウの境界をビジネス サイクル (日次、週次) に合わせます。
  • 短い期間で最新のトレンドを捉えることはできますが、ノイズが多くなる場合があります。より長い期間を対象とすると、特徴分布はより安定するが、最近の行動変化を見逃す可能性がある。ユースケースに応じて、基となる信号が変化する速度に基づいて選択してください。例えば、7日間のウィンドウは日々の変動を平滑化し、一貫性のあるモデル入力を生成する一方、1時間のウィンドウは行動の変化に迅速に対応できるものの、モデルのパフォーマンスを低下させるようなばらつきを生じさせる可能性がある。分布が変化するとモデルの精度が低下する場合は、入力値を安定させるために、より長いウィンドウを使用してください。
  • 回転式窓やスライド式窓は、巻き上げ式(連続式)窓よりも拡張性に優れている。ほとんどの用途では、まずスライド式窓から始めてみましょう。

パフォーマンス

  • データスキャンを最小限に抑えるため、同じデータソースからの機能を単一のmaterialize_features呼び出しで具体化します。
  • 同じデータソース上のフィーチャーには、同じ粒度(例えば、1時間単位または1日単位のスライド期間)を使用することで、データ化時のグループ化をより適切に行うことができます。

エンティティ列とフィルター条件

同じソース テーブルの機能を操作する場合は、次の決定ガイドを使用します。

異なる集計レベルが必要な場合は、 entity ( create_feature上) を使用してください。

  • 顧客レベルの機能 (顧客ごとに 1 行): entity=["customer_id"]
  • 顧客と販売者の機能 (顧客ごとに複数の行): entity=["customer_id", "merchant_id"]
  • 異なる集計レベルで同じDeltaTableSourceを共有できます 。各機能定義で異なるentity値を指定してください。

同じ集計レベルで行をフィルタリングする必要がある場合は、 filter_condition (on DeltaTableSource ) を使用してください。

  • 高額取引のみ : filter_condition="amount > 100" (顧客ごとに集計されます)
  • 完了した注文のみ : filter_condition="status = 'completed'" (顧客ごとに集計されます)

経験則として、 変更によってエンティティ値ごとの行数が変わる場合は、フィーチャ定義で異なるentity値を使用してください。同じ集計に寄与する行をフィルタリングするだけであれば、ソースでfilter_conditionを使用してください。

一般的なパターン

顧客分析

Python
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow

fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),

# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),

# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]

トレンド分析

Python
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)

historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)

季節パターン

Python
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)

制限事項

  • create_training_set APIで使用する場合、トレーニング(ラベル付き)データセットと特徴定義の間で、エンティティと時系列列の名前が一致している必要があります。
  • トレーニング データセットのlabel列として使用される列名は、 Feature定義に使用されるソース テーブルに存在してはなりません。
  • create_feature APIでは、限られた関数 ( UDAFs ) がサポートされています。 サポートされている機能を参照してください。
  • エンティティ列は、型DATEまたはTIMESTAMPにすることはできません。
  • RequestSource ScalarDataType ( INTEGERFLOATBOOLEANSTRINGDOUBLELONGTIMESTAMPDATESHORT ) で定義されたスカラーデータ型のみをサポートします。配列、マップ、構造体などの複雑な型はサポートされていません。
  • RequestSource 集計関数や時間枠はサポートしていません。使用できる関数はColumnSelectionのみです。
  • トレーニングセットまたはサービス提供エンドポイント内のすべてのソースにおいて、エンティティ列名、時系列列名、およびリクエスト特徴量列名のセットは、グローバルに一意である必要があります。

物質化に関する制限事項については、 「制限事項」を参照してください。