宣言型特徴エンジニアリングとマネージドパイプライン
ベータ版
この機能はベータ版であり、次のリージョンで利用できます: us-east-1およびus-west-2 。
Feature Store の宣言型APIs使用すると、データ ソースから時間枠の集計機能を定義およびコンピュートできます。 このガイドでは、次のワークフローについて説明します。
-
機能開発 ワークフロー
create_featureを使用して、モデルのトレーニングとサービング ワークフローで使用できる Unity Catalog 機能オブジェクトを定義します。
-
モデルトレーニング ワークフロー
create_training_set使用して、機械学習の特定の時点における集約された特徴を計算します。これにより、モデルをトレーニングするための観測データセットに拡張されたコンピュート機能を備えたSpark DataFrameを返すことができるトレーニング セット オブジェクトが返されます。- このトレーニング セットで
log_model呼び出して、このモデルを機能オブジェクトとモデル オブジェクト間のリネージとともにUnity Catalogに保存します。 score_batchUnity Catalogリネージを使用して、機能定義コードを使用して、モデルのスコアリングのための推論データセットに追加されたポイントインタイムの正しい機能集約を実行します。
-
機能の実現と提供の ワークフロー
create_featureで機能を定義するか、get_featureを使用して機能を取得した後、materialize_featuresを使用して、その機能または機能のセットをオフライン ストアに具体化して効率的に再利用したり、オンライン ストアに具体化してオンラインで提供したりできます。- マテリアライズドビューで
create_training_set使用して、オフライン バッチ トレーニング データセットを準備します。
log_modelとscore_batch詳細なドキュメントについては、 「特徴を使用してモデルをトレーニングする」を参照してください。
要件
-
Databricks Runtime 17.0 ML以降を実行するクラシック コンピュートクラスター。
-
カスタム Python パッケージをインストールする必要があります。ノートブックが実行されるたびに、次のコード行を実行する必要があります。
Python%pip install databricks-feature-engineering>=0.14.0
dbutils.library.restartPython()
クイックスタートの例
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, TumblingWindow, SlidingWindow, OfflineStoreConfig
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
entity_columns=["user_id"],
timeseries_column="transaction_time"
)
# 2. Define features
fe = FeatureEngineeringClient()
features = [
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="avg_transaction_30d",
source=source,
inputs=["amount"],
function=Avg(),
time_window=TumblingWindow(window_duration=timedelta(days=30))
),
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
source=source,
inputs=["amount"],
function=Sum(),
time_window=SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))
# name auto-generated: "amount_sum_continuous_7d"
),
]
# 3. Create training set using declarative features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
# It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="target",
)
training_set.load_df().display() # action: joins labeled_df with computed feature
# 4. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 5. (Optional) Materialize features for serving
fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features"
),
pipeline_state="ACTIVE",
cron_schedule="0 0 * * * ?" # Hourly
)
フィーチャーを具体化した後、CPU モデルサービングを使用してモデルを提供できます。 オンライン サービングの詳細については、 「宣言的機能の実現と提供」を参照してください。
データソース
DeltaTableSource
timeseries_columnに許可されるデータ型: TimestampType、DateType。他の整数データ型も機能しますが、時間ウィンドウの集計では精度が低下します。
次のコードは、Unity Catalog のmain.analytics.user_eventsテーブルを使用する例を示しています。
from databricks.feature_engineering.entities import DeltaTableSource
source = DeltaTableSource(
catalog_name="main", # Catalog name
schema_name="analytics", # Schema name
table_name="user_events", # Table name
entity_columns=["user_id"], # Join keys, used to look up features for an entity
timeseries_column="event_time" # Timestamp for time windows
)
宣言型機能API
create_feature() API
FeatureEngineeringClient.create_feature() 包括的な検証を提供し、適切な機能構築を保証します。
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource
inputs: List[str], # Required: List of column names from the source
function: Union[Function, str], # Required: Aggregation function (Sum, Avg, Count, etc.)
time_window: TimeWindow, # Required: TimeWindow for aggregation
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
name: Optional[str], # Optional: Feature name (auto-generated if omitted)
description: Optional[str], # Optional: Feature description
filter_condition: Optional[str], # Optional: SQL WHERE clause to filter source data
) -> Feature
パラメーター:
source: 特徴計算に使用されるデータソースinputs: 集計の入力として使用するソースの列名のリストfunction: 集計関数 (関数インスタンスまたは文字列名)。サポートされている機能のリストを以下で参照してください。time_window: 集計のための時間ウィンドウ(TimeWindowインスタンスまたは「duration」とオプションの「offset」を持つ辞書)catalog_name: フィーチャのカタログ名schema_name: フィーチャのスキーマ名name: オプションの機能名(省略した場合は自動生成されます)description: 機能のオプションの説明filter_condition: 集計前にソース データをフィルター処理するためのオプションの SQL WHERE 句。例:"status = 'completed'"、"transaction" = "Credit" AND "amount > 100"
戻り値: 検証されたFeatureインスタンス
検証に失敗した場合は ValueError が 発生します
自動生成された名前
nameを省略すると、名前は{column}_{function}_{window}パターンに従います。例えば:
price_avg_continuous_1h(1時間平均価格)transaction_count_continuous_30d_1d(イベントタイムスタンプから1日間のオフセットを含む30日間のトランザクションカウント)
サポートされている機能
すべての関数は、以下の時間ウィンドウのセクションで説明されているように、集約時間ウィンドウに適用されます。
関数 | 速記 | 説明 | 使用例 |
|---|---|---|---|
|
| 値の合計 | ユーザーごとの1日あたりのアプリ使用時間(分) |
|
| 値の平均 | 平均取引額 |
|
| レコード数 | ユーザーあたりのログイン数 |
|
| 最小値 | ウェアラブルデバイスで記録された最低心拍数 |
|
| 最大値 | セッションあたりの最大バスケットサイズ |
|
| 母集団標準偏差 | 全顧客の日々の取引額の変動 |
|
| サンプル標準偏差 | 広告キャンペーンのクリックスルー率の変動 |
|
| 母集団分散 | 工場におけるIoTデバイスのセンサー読み取りの普及 |
|
| 標本分散 | サンプルグループにおける映画評価の分布 |
|
| おおよそのユニーク数 | 購入したアイテムの個別の数 |
| N/A | おおよそのパーセンタイル | p95応答潜時 |
|
| 最初の値 | 最初のログインタイムスタンプ |
|
| 最後の値 | 最近の購入金額 |
*問題のある関数は、文字列の省略表現を使用する場合は安全な値を使用します。
次の例は、同じデータソースに対して定義されたウィンドウ集約機能を示しています。
from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct
fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
source=source,
inputs=["product_id"],
function=ApproxCountDistinct(relativeSD=0.01),
...
)
フィルター条件付き機能
宣言型機能APIs 、集計の WHERE 句として適用されるSQLフィルターの適用もサポートします。 フィルターは、機能の計算に必要なデータのスーパーセットを含む大規模なソース テーブルを操作するときに役立ち、これらのテーブルの上に個別のビューを作成する必要性を最小限に抑えます。
from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta
# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=transactions,
inputs=["amount"],
function=Sum(),
time_window=ContinuousWindow(window_duration=timedelta(days=30)),
filter_condition="amount > 100" # Only transactions over $100
)
# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=orders,
inputs=["order_id"],
function=Count(),
time_window=ContinuousWindow(window_duration=timedelta(days=7)),
filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)
時間枠
機能エンジニアリングの宣言型APIs 、タイム ウィンドウ ベースの集計のルックバック動作を制御するために、連続、タンブリング、スライディングという 3 つの異なるウィンドウ タイプをサポートします。
- 連続ウィンドウはイベント時間から遡ります。期間とオフセットは明示的に定義されます。
- タンブリング ウィンドウは、重複しない固定の時間ウィンドウです。各データポイントは 1 つのウィンドウにのみ属します。
- スライディング ウィンドウは、設定可能なスライド間隔を持つ、重なり合うローリング タイム ウィンドウです。
次の図は、それらがどのように動作するかを示しています。

連続ウィンドウ
継続的なウィンドウは、最新のリアルタイムの集計であり、通常はストリーミング データで使用されます。ストリーミング パイプラインでは、イベントの入力や終了など、固定長ウィンドウの内容が変更された場合にのみ、連続ウィンドウによって新しい行が出力されます。トレーニング パイプラインで連続ウィンドウ機能を使用すると、特定のイベントのタイムスタンプの直前の固定長のウィンドウ期間を使用して、ソース データに対して正確なポイントインタイム機能の計算が実行されます。これにより、オンラインとオフラインの偏りやデータ漏洩を防ぐことができます。時刻Tの機能は、[ T − 期間、 T ) からのイベントを集約します。
class ContinuousWindow(TimeWindow):
window_duration: datetime.timedelta
offset: Optional[datetime.timedelta] = None
次の表に、連続ウィンドウの不安を示します。 ウィンドウの開始時間と終了時間は、次のようにこれらの懸念に基づいています。
- 開始時刻:
evaluation_time - window_duration + offset(含む) - 終了時間:
evaluation_time + offset(含まない)
パラメーター | 制約 |
|---|---|
| ≤ 0 である必要があります (ウィンドウを終了タイムスタンプから時間的に後方に移動します)。今後のイベントがトレーニング データセットに漏洩するのを防ぐために、 |
| 0より大きい必要があります |
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta
# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))
以下のコードを使用して、オフセット付きの連続ウィンドウを定義します。
# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
window_duration=timedelta(days=7),
offset=timedelta(days=-1)
)
連続ウィンドウの例
-
window_duration=timedelta(days=7), offset=timedelta(days=0): これにより、現在の評価時間で終了する 7 日間の遡及期間が作成されます。7 日目の午後 2 時のイベントの場合、0 日目の午後 2 時から 7 日目の午後 2 時までのすべてのイベント (ただし、この時間は含みません) が含まれます。 -
window_duration=timedelta(hours=1), offset=timedelta(minutes=-30): これにより、評価時間の 30 分前に終了する 1 時間のルックバック ウィンドウが作成されます。午後 3 時のイベントの場合、午後 1 時 30 分から午後 2 時 30 分までのすべてのイベントが含まれます (ただし、午後 2 時 30 分は含まれません)。これは、データ取り込み遅延に対するアカウントに役立ちます。
タンブリングウィンドウ
タンブリング ウィンドウを使用して定義された機能の場合、集計はスライド間隔で進む事前に決定された固定長のウィンドウ上でコンピュートされ、時間を完全に分割する重複しないウィンドウが生成されます。 その結果、ソース内の各イベントは 1 つのウィンドウにのみ寄与します。時刻tの機能は、 t (含まない) 以前に終了したウィンドウからのデータを集計します。Windows は Unix エポックからスタートします。
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta
次の表に、タンブリング ウィンドウの懸念を示します。
パラメーター | 制約 |
|---|---|
| 0より大きい必要があります |
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta
window = TumblingWindow(
window_duration=timedelta(days=7)
)
タンブリングウィンドウの例
window_duration=timedelta(days=5): これにより、それぞれ 5 日間の事前に決定された固定長のウィンドウが作成されます。例: ウィンドウ #1 は 0 日目から 4 日目まで、ウィンドウ #2 は 5 日目から 9 日目まで、ウィンドウ #3 は 10 日目から 14 日目まで、というようになります。具体的には、ウィンドウ #1 には、0 日目の00:00:00.00から 5 日目のタイムスタンプがであるイベントまで (ただし、00:00:00.00を含まない) のタイムスタンプを持つすべてのイベントが含まれます。各イベントは 1 つのウィンドウに属します。
引き戸
スライディング ウィンドウを使用して定義された機能の場合、集計は、スライド間隔で進む事前に決定された固定長のウィンドウ上でコンピュートされ、重複するウィンドウが生成されます。 ソース内の各イベントは、複数のウィンドウの特徴の集約に貢献できます。時刻tの機能は、 t (含まない) 以前に終了したウィンドウからのデータを集計します。Windows は Unix エポックからスタートします。
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta
次の表に、スライディング ウィンドウの懸念を示します。
パラメーター | 制約 |
|---|---|
| 0より大きい必要があります |
| 0より大きく、<である必要があります |
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta
window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)
スライディングウィンドウの例
window_duration=timedelta(days=5), slide_duration=timedelta(days=1): これにより、毎回 1 日ずつ進む、重複する 5 日間のウィンドウが作成されます。例: ウィンドウ #1 は 0 日目から 4 日目まで、ウィンドウ #2 は 1 日目から 5 日目まで、ウィンドウ #3 は 2 日目から 6 日目まで、というようになります。各ウィンドウには、開始日の00:00:00.00から終了日の00:00:00.00まで (ただし、 は含みません) のイベントが含まれます。ウィンドウは重複するため、1 つのイベントが複数のウィンドウに属することができます (この例では、各イベントは最大 5 つの異なるウィンドウに属します)。
APIメソッド
create_training_set()
ML トレーニング用にラベル付きデータと特徴を結合します。
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
# API continues to support creating training set using materialized feature tables and functions
) -> TrainingSet
TrainingSet.load_dfを呼び出して、ポイントインタイムの動的コンピュート機能と結合された元のトレーニング データを取得します。
df引数の要件:
- フィーチャデータソースのすべての
entity_columns含める必要があります - フィーチャデータソースからの
timeseries_column含める必要があります - ラベル列を含める必要があります
ポイントインタイムの正確性: 将来のモデル トレーニングへのデータ漏洩を防ぐために、機能は各行のタイムスタンプの前に利用可能なソース データのみを使用してコンピュートされます。 計算では、効率化のために Spark のウィンドウ関数を活用します。
log_model()
推論中のリネージ追跡と自動特徴検索のために、特徴メタデータを含むモデルをログに記録します。
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)
flavorでは、使用するMLflowモデル フレーバーモジュール ( mlflow.sklearnやmlflow.xgboostなど) を指定します。
TrainingSetでログに記録されたモデルは、トレーニングで使用された機能へのリネージを自動的に追跡します。 詳細なドキュメントについては、 「特徴を使用してモデルをトレーニングする」を参照してください。
score_batch()
自動特徴検索によるバッチ推論を実行します。
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame
score_batch モデルとともに保存された特徴メタデータを使用して、推論用の特定時点の正しい特徴を自動的にコンピュートし、トレーニングとの一貫性を確保します。 詳細なドキュメントについては、 「特徴を使用してモデルをトレーニングする」を参照してください。
ベストプラクティス
機能の命名
- ビジネスに不可欠な機能にはわかりやすい名前を使用します。
- チーム全体で一貫した命名規則に従ってください。
- 探索的な機能を自動生成で処理します。
時間枠
- オフセットを使用して、不安定な最近のデータを除外します。
- ウィンドウの境界をビジネス サイクル (日次、週次) に合わせます。
- データの鮮度と機能の安定性のトレードオフを考慮してください。
パフォーマンス
- データスキャンを最小限に抑えるために、データソースごとに機能をグループ化します。
- ユースケースに応じて適切なウィンドウ サイズを使用します。
テスト
- 既知のデータ シナリオを使用して時間ウィンドウの境界をテストします。
一般的なパターン
顧客分析
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]
トレンド分析
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7))
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)
季節パターン
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)
制限事項
create_training_setAPI で使用する場合、エンティティ列と時系列列の名前は、トレーニング (ラベル付き) データセットとソース テーブル間で一致する必要があります。- トレーニング データセットの
label列として使用される列名は、Feature定義に使用されるソース テーブルに存在してはなりません。 create_featureAPIでは、限定された関数リスト ( UDAFs ) がサポートされています。