宣言的な特徴
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
宣言型フィーチャー エンジニアリングAPIs使用すると、データ ソースからフィーチャーを定義およびコンピュートできます。 機能は、さまざまな ( Deltaテーブルやリクエスト時間データ) および計算 (時間ウィンドウ集計、単純な列選択など) を使用して定義できます。 このガイドでは、以下のワークフローについて説明します。
-
機能開発 ワークフロー
create_featureを使用して、モデルのトレーニングとサービング ワークフローで使用できる Unity Catalog 機能オブジェクトを定義します。- あるいは、
Featureオブジェクトをローカルで構築し、後でregister_featureを使用してそれらをUnity Catalogに永続化することもできます。 ローカルで構築された特徴は、登録前にcreate_training_setと組み合わせて使用できます。
-
モデルトレーニング ワークフロー
- 機械学習における特定時点の集約特徴量を計算するには、
create_training_setを使用します。宣言的機能を使用したトレーニングに関する詳細なドキュメントについては、 「宣言的機能を使用してモデルをトレーニングする」を参照してください。
- 機械学習における特定時点の集約特徴量を計算するには、
-
機能の実現と提供の ワークフロー
create_featureで機能を定義するか、get_featureを使用して機能を取得した後、materialize_featuresを使用して、その機能または機能のセットをオフライン ストアに具体化して効率的に再利用したり、オンライン ストアに具体化してオンラインで提供したりできます。- マテリアライズドビューで
create_training_set使用して、オフライン バッチ トレーニング データセットを準備します。
APIの詳細については、 「宣言型機能APIリファレンス」を参照してください。
要件
-
Databricks Runtime 17.0 ML以降を実行するクラシック コンピュートクラスター。
-
カスタムPythonパッケージをインストールする必要があります。ノートブックを実行するたびに、次のコード行を実行します。
Python%pip install databricks-feature-engineering>=0.15.0
dbutils.library.restartPython()
クイックスタートの例
実行可能なクイックスタートノートブックについては、 「サンプルノートブック」を参照してください。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, DeltaTableSource, Feature, AggregationFunction,
MaterializedFeaturePipelineScheduleState,
Sum, Avg, ColumnSelection, TableTrigger,
TumblingWindow, SlidingWindow,
OfflineStoreConfig, OnlineStoreConfig,
)
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
)
# 2. Define features locally (no catalog/schema needed yet)
avg_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), TumblingWindow(window_duration=timedelta(days=30))),
name="avg_transaction_30d",
)
sum_feature = Feature(
source=source,
entity=["user_id"],
timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))),
# name auto-generated: "amount_sum_sliding_7d_1d"
)
fe = FeatureEngineeringClient()
# 3. Explore features with compute_features
feature_df = fe.compute_features(features=[avg_feature, sum_feature])
feature_df.display()
# 4. Create training set using local features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
training_set = fe.create_training_set(
df=labeled_df,
features=[avg_feature, sum_feature],
label="target",
)
training_set.load_df().display()
# 5. Register features in Unity Catalog
avg_feature = fe.register_feature(
feature=avg_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
sum_feature = fe.register_feature(
feature=sum_feature,
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
)
# 6. Or use create_feature for a one-step define-and-register workflow
latest_amount = fe.create_feature(
source=source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="latest_amount",
)
# 7. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 8. (Optional) Materialize features for serving
# Features must be registered in UC before calling materialize_features
online_config = OnlineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store",
)
# Aggregation features use CronSchedule and support both offline and online configs
fe.materialize_features(
features=[avg_feature, sum_feature],
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features",
),
online_config=online_config,
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
# ColumnSelection features use TableTrigger and only support online config
fe.materialize_features(
features=[latest_amount],
online_config=online_config,
trigger=TableTrigger(),
)
サンプルノートブック
宣言型機能クイックスタートノートブック
モデルのトレーニングと推論
log_model() 、 score_batch() 、およびcreate_training_set()などの宣言型機能を使用してモデルをトレーニングし、実行バッチ推論を実行するには、「宣言型機能を使用したモデルのトレーニング」を参照してください。
機能の具体化
機能を定義したら、それらをオフラインまたはオンラインストアに具体化することで、トレーニングやサービス提供のワークフローにおいて効率的に再利用できます。フィーチャーを具体化した後、CPU モデルサービングを使用してモデルを提供できます。 詳細については、 「Materialize の宣言型機能」を参照してください。
ベストプラクティス
機能の命名
- ビジネスに不可欠な機能にはわかりやすい名前を使用します。
- チーム全体で一貫した命名規則に従ってください。
- 機能開発を始める際は、自動生成された名前を使用してください。
時間枠
- ウィンドウの境界をビジネス サイクル (日次、週次) に合わせます。
- 短い期間で最新のトレンドを捉えることはできますが、ノイズが多くなる場合があります。より長い期間を対象とすると、特徴分布はより安定するが、最近の行動変化を見逃す可能性がある。ユースケースに応じて、基となる信号が変化する速度に基づいて選択してください。例えば、7日間のウィンドウは日々の変動を平滑化し、一貫性のあるモデル入力を生成する一方、1時間のウィンドウは行動の変化に迅速に対応できるものの、モデルのパフォーマンスを低下させるようなばらつきを生じさせる可能性がある。分布が変化するとモデルの精度が低下する場合は、入力値を安定させるために、より長いウィンドウを使用してください。
- 回転式窓やスライド式窓は、巻き上げ式(連続式)窓よりも拡張性に優れている。ほとんどの用途では、まずスライド式窓から始めてみましょう。
パフォーマンス
- データスキャンを最小限に抑えるため、同じデータソースからの機能を単一の
materialize_features呼び出しで具体化します。 - 同じデータソース上のフィーチャーには、同じ粒度(例えば、1時間単位または1日単位のスライド期間)を使用することで、データ化時のグループ化をより適切に行うことができます。
エンティティ列とフィルター条件
同じソース テーブルの機能を操作する場合は、次の決定ガイドを使用します。
異なる集計レベルが必要な場合は、 entity ( create_feature上) を使用してください。
- 顧客レベルの機能 (顧客ごとに 1 行):
entity=["customer_id"] - 顧客と販売者の機能 (顧客ごとに複数の行):
entity=["customer_id", "merchant_id"] - 異なる集計レベルで同じ
DeltaTableSourceを共有できます 。各機能定義で異なるentity値を指定してください。
同じ集計レベルで行をフィルタリングする必要がある場合は、 filter_condition (on DeltaTableSource ) を使用してください。
- 高額取引のみ :
filter_condition="amount > 100"(顧客ごとに集計されます) - 完了した注文のみ :
filter_condition="status = 'completed'"(顧客ごとに集計されます)
経験則として、 変更によってエンティティ値ごとの行数が変わる場合は、フィーチャ定義で異なるentity値を使用してください。同じ集計に寄与する行をフィルタリングするだけであれば、ソースでfilter_conditionを使用してください。
一般的なパターン
顧客分析
from databricks.feature_engineering.entities import AggregationFunction, Sum, Count, RollingWindow
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=1)))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Count(input="transaction_id"), RollingWindow(window_duration=timedelta(days=90)))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions,
entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Sum(input="amount"), RollingWindow(window_duration=timedelta(days=30)))),
]
トレンド分析
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7))),
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=7), delay=timedelta(days=7))),
)
季節パターン
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, entity=["user_id"], timeseries_column="transaction_time",
function=AggregationFunction(Avg(input="amount"), RollingWindow(window_duration=timedelta(days=1), delay=timedelta(weeks=4))),
)
制限事項
create_training_setAPIで使用する場合、トレーニング(ラベル付き)データセットと特徴定義の間で、エンティティと時系列列の名前が一致している必要があります。- トレーニング データセットの
label列として使用される列名は、Feature定義に使用されるソース テーブルに存在してはなりません。 create_featureAPIでは、限られた関数 ( UDAFs ) がサポートされています。 サポートされている機能を参照してください。- エンティティ列は、型
DATEまたはTIMESTAMPにすることはできません。 RequestSourceScalarDataType(INTEGER、FLOAT、BOOLEAN、STRING、DOUBLE、LONG、TIMESTAMP、DATE、SHORT) で定義されたスカラーデータ型のみをサポートします。配列、マップ、構造体などの複雑な型はサポートされていません。RequestSource集計関数や時間枠はサポートしていません。使用できる関数はColumnSelectionのみです。- トレーニングセットまたはサービス提供エンドポイント内のすべてのソースにおいて、エンティティ列名、時系列列名、およびリクエスト特徴量列名のセットは、グローバルに一意である必要があります。
物質化に関する制限事項については、 「制限事項」を参照してください。