宣言的な機能を具体化する
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
Unity Catalogに保存される宣言的な機能定義を作成したら、機能定義を使用して ソース テーブルから機能データを生成できます。 このプロセスは、機能の具体化と呼ばれます。Databricks 、 LakeFlow Spark宣言型パイプラインを作成および管理し、モデルのトレーニングとバッチ スコアリングまたはオンライン サービスのためにUnity Catalogにテーブルを設定します。
宣言的機能の提供については、 「宣言的機能の提供」を参照してください。
要件
- 機能は宣言型機能APIを使用して作成し、 Unity Catalogに保存する必要があります。
- バージョン要件については、 「要件」を参照してください。
ColumnSelectionこれらの機能はオンラインストアに具体化できる。ColumnSelection の具体化を参照してください。RequestSource特徴量は推論時に提供されるデータを表すため、具体化することはできません。
APIデータ構造
OfflineStoreConfig
具体化された機能が書き込まれるオフラインストアの設定。materialize_featuresが呼び出されると、Feature Storeのバックエンドはこのプレフィックスを使用してテーブルを作成します。 各パイプライン実行時に、最新のフィーチャー値が実装スケジュールに従ってテーブルに実装されます。
OfflineStoreConfig(
catalog_name: str, # Catalog name for the offline table where materialized features will be stored
schema_name: str, # Schema name for the offline table
table_name_prefix: str # Table name prefix for the offline table. The pipeline may create multiple tables with this prefix, each updated at different cadences
)
from databricks.feature_engineering.entities import OfflineStoreConfig
offline_store = OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
)
OnlineStoreConfig
モデルサービングで使用される機能を保存するオンラインストアの設定。 マテリアライゼーションにより、 catalog.schema.table_name_prefixを使用してDeltaテーブルが作成され、同じ名前でオンラインFeature Storeにテーブルがストリーム配信されます。
from databricks.feature_engineering.entities import OnlineStoreConfig
online_store = OnlineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store"
)
MaterializedFeature
マテリアライズされた、つまりUnity Catalogで使用できる事前計算済みの表現を持つ宣言型機能を表します。 オフライン テーブルとオンライン テーブルには、別個のマテリアライズド フィーチャがあります。通常、ユーザーはMaterializedFeatureを直接インスタンス化しません。
API関数呼び出し
materialize_features()
宣言型機能のリストを、オフラインのDeltaテーブルまたはオンラインFeature Storeに具体化します。 この関数を呼び出す前に、 Unity Catalogに機能を登録する必要があります(たとえば、 create_featureまたはregister_featureを使用します)。 登録されていない現地で構築された設備は機能しません。
FeatureEngineeringClient.materialize_features(
features: List[Feature], # List of declarative features to materialize
offline_config: Optional[OfflineStoreConfig] = None, # Offline store config (aggregation features only)
online_config: Optional[OnlineStoreConfig] = None, # Online store config
trigger: Union[CronSchedule, TableTrigger], # Materialization trigger
) -> List[MaterializedFeature]:
このメソッドは、具体化された機能のリストを返します。このリストには、機能の値が更新された日時や、機能が具体化されているUnity Catalogテーブルに関するメタデータが含まれています。
OnlineStoreConfigとOfflineStoreConfigの両方が指定されている場合は、指定された機能ごとに 2 つの具体化された機能が返され、ストアの種類ごとに 1 つずつになります。
trigger問題は、いつ実体化パイプラインを実行するかを制御します。
CronSchedule:固定スケジュールで実行されます。集計機能に必要です(AggregationFunction)。TableTrigger: 上流のDeltaテーブルがコミットを受け取ったときに実行します。DeltaTableSourceによって支えられているColumnSelection機能に必要です。
ColumnSelectionと集計機能は異なるトリガータイプを必要とするため、単一のmaterialize_features呼び出しで混在させることはできません。代わりに、個別に呼び出しを行ってください。
オフラインストアに具体化
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, MaterializedFeaturePipelineScheduleState, OfflineStoreConfig,
)
fe = FeatureEngineeringClient()
materialized = fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
),
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
オンラインストアに具体化
オンラインストアに集約機能を実装するには、オフラインストアにも集約機能を実装する必要があります。offline_configとonline_configの両方が必要です。online_store_name既存のオンラインFeature Storeを参照する必要があります。 作成方法については、 Databricksオンライン機能ストアを参照してください。
ColumnSelection 機能にはOfflineStoreConfigは必要ありません。ColumnSelection の具体化を参照してください。
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
CronSchedule, MaterializedFeaturePipelineScheduleState,
OfflineStoreConfig, OnlineStoreConfig,
)
fe = FeatureEngineeringClient()
materialized = fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features"
),
online_config=OnlineStoreConfig(
catalog_name="main",
schema_name="feature_store",
table_name_prefix="customer_features_serving",
online_store_name="customer_features_store"
),
trigger=CronSchedule(
quartz_cron_expression="0 0 * * * ?", # Hourly
timezone_id="UTC",
pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
),
)
list_materialized_features()
ユーザーのUnity Catalogメタストア内のすべての具体化された機能のリストを返します。
デフォルトでは、最大 100 個の機能が返されます。この制限は、 max_results問題を使用して変更できます。
返された具体化された機能を機能名でフィルタリングするには、オプションのfeature_name問題を使用します。
FeatureEngineeringClient.list_materialized_features(
feature_name: Optional[str] = None, # Optional feature name to filter by
max_results: int = 100, # Maximum number of features to be returned
) -> List[MaterializedFeature]:
delete_materialized_feature()
具体化された機能を削除する前に、その機能を参照しているモデルや機能仕様をすべて削除または更新してください。
具体化されたフィーチャを削除します。渡すべき機能は、機能の種類によって異なります。
- 集約機能 :オフラインで具体化された機能を渡す。同じ機能に対してオンライン上に既に実装された機能が存在する場合、両方とも削除されます。
ColumnSelection機能 : オンライン具体化機能を渡す。ColumnSelection機能はオンラインストアにのみ実装されるため ( ColumnSelection の実装を参照)、対応するオフライン機能はありません。
具体化処理の一環として、効率化のために、フィーチャはデータソースと集計ウィンドウごとにグループ化されます。ColumnSelectionフィーチャーには集計ウィンドウがないため、データソースごとにのみグループ化されます。グループ化されたすべてのフィーチャーが削除されるまで、マテリアライゼーションパイプライン、オフラインテーブル、およびオンラインテーブルは削除されません。そのグループ内の最後に具体化されたフィーチャーが削除されると、関連するすべてのリソースがフィーチャー ストアによって自動的に削除されます。
マテリアライズドフィーチャーをクリーンアップするには、マテリアライズドフィーチャーに関連付けられているテーブルを参照してください。コンピュートおよびDeltaテーブル リソースがクリーンアップされる前に、テーブル内の各フィーチャ (列ごとに 1 つ) を削除する必要があります。
list_materialized_features()を使用してmaterialized_feature引数を取得します。
FeatureEngineeringClient.delete_materialized_feature(
materialized_feature: MaterializedFeature, # Required: The materialized feature to delete
) -> None
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import ColumnSelection
fe = FeatureEngineeringClient()
feature_names = [
"main.feature_store.amount_sum_sliding_7d_1d",
"main.feature_store.amount_sum_sliding_30d_1d",
"main.feature_store.transaction_count_sliding_7d_1d",
"main.feature_store.latest_transaction_amount",
"main.feature_store.latest_user_tier",
]
for name in feature_names:
feature = fe.get_feature(full_name=name)
for mf in fe.list_materialized_features(feature_name=name):
if isinstance(feature.function, ColumnSelection):
# ColumnSelection features only have online materializations. Delete the online materialized feature directly.
fe.delete_materialized_feature(materialized_feature=mf)
elif not mf.is_online:
# Aggregation features have both offline and online materializations. Delete the offline materialized feature to delete both.
fe.delete_materialized_feature(materialized_feature=mf)
# Online materialized aggregation features cannot be deleted directly. They are deleted via their paired offline materialized features.
列選択の具体化
ColumnSelection この機能は、集計を行わずに、エンティティキーごとに単一の列の最新値を選択します。それらはオンラインストアでのみ販売可能です。オフラインでの使用例(トレーニングおよびバッチ推論)では、 ColumnSelection特徴量はクエリ時にソースデータから直接取得されるため、オフラインでの具体化は不要です。
具現化挙動
- このパイプラインは、集計ウィンドウを使用せずに、エンティティキーごとに最新の行をオンラインテーブルに書き込みます。
- オンラインマテリアライゼーションは、エンティティキーごとに、オンラインテーブルに最新の値を入力します。
例
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
DeltaTableSource, Feature, ColumnSelection, TableTrigger, OnlineStoreConfig,
)
fe = FeatureEngineeringClient()
delta_source = DeltaTableSource(
catalog_name="catalog",
schema_name="schema",
table_name="transactions",
)
amount_feature = Feature(
source=delta_source,
function=ColumnSelection("amount"),
entity=["user_id"],
timeseries_column="transaction_time",
name="latest_transaction_amount",
)
# Register before materializing
amount_feature = fe.register_feature(
feature=amount_feature,
catalog_name="catalog",
schema_name="schema",
)
mfs = fe.materialize_features(
features=[amount_feature],
online_config=OnlineStoreConfig(
catalog_name="catalog",
schema_name="feats_online",
table_name_prefix="txn_",
online_store_name="lb_usw2"
),
trigger=TableTrigger(),
)
ColumnSelection この機能では、ソース Delta テーブルが新しいコミットを受け取るたびにパイプラインが実行されるTableTriggerを使用します。オフライン使用ケース(トレーニングおよびバッチ推論)では、 ColumnSelection特徴がソースから直接読み込まれるため、 offline_configは必要ありません。
RequestSource 特徴量は、推論時に呼び出し元から提供されたデータ(またはラベル付きDataFrameから抽出されたデータ)を表すため、具体化できません。 読み取るソース テーブルはありません。値はリクエスト ペイロードまたはDataFrameにのみ存在します。
制限事項
- バッチローリングウィンドウ機能は実装できません。時間の正確さの忠実度が高いため、オフライン トレーニングまたはバッチ推論用のローリング ウィンドウ機能がデータポイントごとにオンザフライで生成されます。
ColumnSelectionこれらの機能はオンラインストアでのみ実現可能です。RequestSource機能は実現できません。- マテリアライズドフィーチャーは、作成されたワークスペース内でのみ削除できます。
- 具体化された集計機能の場合、オンラインの具体化された機能を直接削除することはできません。ペアになっているオフラインのマテリアライズドフィーチャーを削除すると、その変更は両方に反映されます。
- 2026年4月20日より前に作成されたマテリアライズド集計機能については、パイプライン内のすべてのマテリアライズド機能が削除されるまで、マテリアライズドパイプラインは新しい機能値を生成し続け、その後リソースのクリーンアップがトリガーされます。機能ごとの削除をサポートする更新されたパイプラインを作成するには、機能を削除して再マテリアライズします。
- 具体化された
ColumnSelectionのフィーチャについては、パイプライン内のすべての具体化されたフィーチャが削除されるまで、具体化パイプラインは新しいフィーチャ値を生成し続け、削除されるとリソースのクリーンアップがトリガーされます。