Skip to main content

Declarative feature engineering and managed pipelines

Beta

This feature is Beta and is available in the following regions: us-east-1 and us-west-2.

The feature store declarative APIs enable you to define and compute time-windowed aggregation features from data sources. This guide covers the following workflows:

  • Feature development workflow
    • Use create_feature to define Unity Catalog feature objects that can be used in model training and serving workflows.
  • Model training workflow
    • Use create_training_set to calculate point-in-time aggregated features for machine learning. This will return a training set object that can return a Spark DataFrame with computed features augmented to the observation dataset for training a model.
    • Calling log_model with this training set, to save this model in Unity Catalog, along with lineage between feature and model objects.
    • score_batch uses Unity Catalog lineage to use feature definition code to perform point-in-time correct feature aggregations augmented to the inference dataset for model scoring.
  • Feature materialization and serving workflow
    • After defining a feature with create_feature or retrieving it using get_feature, you can use materialize_features to materialize the feature or set of features to an offline store for efficient reuse, or to an online store for online serving.
    • Use create_training_set with the materialized view to prepare an offline batch training dataset.

For detailed documentation on log_model and score_batch, see Use features to train models.

Requirements

  • A classic compute cluster running Databricks Runtime 17.0 ML or above.

  • You must install the custom Python package. The following lines of code must be executed each time a notebook is run:

    Python
    %pip install databricks-feature-engineering>=0.14.0
    dbutils.library.restartPython()

Quickstart example

Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, TumblingWindow, SlidingWindow, OfflineStoreConfig
from datetime import timedelta

CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"

# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
entity_columns=["user_id"],
timeseries_column="transaction_time"
)

# 2. Define features
fe = FeatureEngineeringClient()
features = [
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="avg_transaction_30d",
source=source,
inputs=["amount"],
function=Avg(),
time_window=TumblingWindow(window_duration=timedelta(days=30))
),
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
source=source,
inputs=["amount"],
function=Sum(),
time_window=SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))
# name auto-generated: "amount_sum_continuous_7d"
),
]

# 3. Create training set using declarative features

# `labeled_df` should have columns "user_id", "transaction_time", and "target".
# It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="target",
)
training_set.load_df().display() # action: joins labeled_df with computed feature

# 4. Train model
with mlflow.start_run():
training_df = training_set.load_df()

# training code

fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)

# 5. (Optional) Materialize features for serving
fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features"
),
pipeline_state="ACTIVE",
cron_schedule="0 0 * * * ?" # Hourly
)
note

After materializing features, you can serve models using CPU model serving. For details on online serving, see Materialize and serve declarative features.

Data sources

DeltaTableSource

note

Permitted data types for timeseries_column: TimestampType, DateType. Other integer data types can work but will cause loss in precision for time window aggregates.

The following code shows an example using the main.analytics.user_events table from Unity Catalog:

Python
from databricks.feature_engineering.entities import DeltaTableSource

source = DeltaTableSource(
catalog_name="main", # Catalog name
schema_name="analytics", # Schema name
table_name="user_events", # Table name
entity_columns=["user_id"], # Join keys, used to look up features for an entity
timeseries_column="event_time" # Timestamp for time windows
)

Declarative feature API

create_feature() API

FeatureEngineeringClient.create_feature() provides comprehensive validation and ensures proper feature construction:

Python
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource
inputs: List[str], # Required: List of column names from the source
function: Union[Function, str], # Required: Aggregation function (Sum, Avg, Count, etc.)
time_window: TimeWindow, # Required: TimeWindow for aggregation
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
name: Optional[str], # Optional: Feature name (auto-generated if omitted)
description: Optional[str], # Optional: Feature description
filter_condition: Optional[str], # Optional: SQL WHERE clause to filter source data
) -> Feature

Parameters:

  • source: The data source used in feature computation
  • inputs: List of column names from the source to use as input for aggregation
  • function: The aggregation function (Function instance or string name). See list of supported functions below.
  • time_window: The time window for aggregation (TimeWindow instance or dict with 'duration' and optional 'offset')
  • catalog_name: The catalog name for the feature
  • schema_name: The schema name for the feature
  • name: Optional feature name (auto-generated if omitted)
  • description: Optional description of the feature
  • filter_condition: Optional SQL WHERE clause to filter source data before aggregation. Example: "status = 'completed'", "transaction" = "Credit" AND "amount > 100"

Returns: A validated Feature instance

Raises: ValueError if any validation fails

Auto-generated names

When name is omitted, names follow the pattern: {column}_{function}_{window}. For example:

  • price_avg_continuous_1h (1-hour average price)
  • transaction_count_continuous_30d_1d (30-day count of transaction with 1d offset from event timestamp)

Supported functions

note

All functions are applied over an aggregation time-window as described in the time windows section below.

Function

Shorthand

Description

Example use case

Sum()

"sum"

Total of values

Per user daily app usage in minutes

Avg()

"avg", "mean"

Average of values

Mean transaction amount

Count()

"count"

Number of records

Number of logins per user

Min()

"min"

Minimum value

Lowest heart rate recorded by a wearable device

Max()

"max"

Maximum value

Maximum basket size of times per session

StddevPop()

"stddev_pop"

Population standard deviation

Daily transaction amount variability across all customers

StddevSamp()

"stddev_samp"

Sample standard deviation

Variability of ad campaign click-through rates

VarPop()

"var_pop"

Population variance

Spread of sensor readings for IoT devices in a factory

VarSamp()

"var_samp"

Sample variance

Spread of movie ratings over a sampled group

ApproxCountDistinct(relativeSD=0.05)

"approx_count_distinct"*

Approximate unique count

Distinct count of items purchased

ApproxPercentile(percentile=0.95,accuracy=100)

N/A

Approximate percentile

p95 response latency

First()

"first"

First value

First login timestamp

Last()

"last"

Last value

Most recent purchase amount

*Functions with parameters use default values when using string shorthand.

The following example shows window-aggregation features defined over the same data source.

Python
from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct

fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
source=source,
inputs=["product_id"],
function=ApproxCountDistinct(relativeSD=0.01),
...
)

Features with filter conditions

The declarative feature APIs also support applying a SQL filter, which is applied as a WHERE clause in aggregations. Filters are useful when working with large source tables that include a superset of data needed for feature computation, and minimizes the need for creating separate views on top of these tables.

Python
from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta

# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=transactions,
inputs=["amount"],
function=Sum(),
time_window=ContinuousWindow(window_duration=timedelta(days=30)),
filter_condition="amount > 100" # Only transactions over $100
)

# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=orders,
inputs=["order_id"],
function=Count(),
time_window=ContinuousWindow(window_duration=timedelta(days=7)),
filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)

Time windows

Feature engineering declarative APIs support three different window types to control lookback behavior for time window-based aggregations: continuous, tumbling, and sliding.

  • Continuous windows look back from the event time. Duration and offset are explicitly defined.
  • Tumbling windows are fixed, non-overlapping time windows. Each data point belongs to exactly one window.
  • Sliding windows are overlapping, rolling time windows with a configurable slide interval.

The following illustration shows how they work.

Continuous, tumbling, and sliding lookback windows.

Continuous window

Continuous windows are up-to-date and real-time aggregates, typically used over streaming data. In streaming pipelines, the continuous window emits a new row only when the contents of the fixed-length window change, such as when an event enters or leaves. When a continuous window feature is used in training pipelines, an accurate point-in-time feature calculation is performed on the source data using the fixed-length window duration immediately preceding a specific event's timestamp. This helps prevent online-offline skew or data leakage. Features at time T aggregate events from [T − duration, T).

class ContinuousWindow(TimeWindow):
window_duration: datetime.timedelta
offset: Optional[datetime.timedelta] = None

The following table lists the parameters for a continuous window. The window start and end times are based on these parameters as follows:

  • Start time: evaluation_time - window_duration + offset (inclusive)
  • End time: evaluation_time + offset (exclusive)

Parameter

Constraints

offset (optional)

Must be ≤ 0 (moves window backward in time from the end timestamp). Use offset to account for any system delay between the time the event is created and the event timestamp to prevent future event leakage into training datasets. For example, if there is a delay of one minute between the time that events are created and these events are eventually landed into a source table where they are assigned a timestamp, then the offset would be timedelta(minutes=-1).

window_duration

Must be > 0

Python
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta

# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))

Define a continuous window with offset using code below.

Python
# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
window_duration=timedelta(days=7),
offset=timedelta(days=-1)
)

Continuous window examples

  • window_duration=timedelta(days=7), offset=timedelta(days=0): This creates a 7-day lookback window ending at the current evaluation time. For an event at 2:00 PM on Day 7, this includes all events from 2:00 PM on Day 0 up to (but not including) 2:00 PM on Day 7.

  • window_duration=timedelta(hours=1), offset=timedelta(minutes=-30): This creates a 1-hour lookback window ending 30 minutes before the evaluation time. For an event at 3:00 PM, this includes all events from 1:30 PM up to (but not including) 2:30 PM. This is useful to account for data ingestion delays.

Tumbling window

For features defined using tumbling windows, aggregations are computed over a pre-determined fixed-length window that advances by a slide interval, producing non-overlapping windows that fully partition time. As a result, each event in the source contributes to exactly one window. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.

class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta

The following table lists the parameters for a tumbling window.

Parameter

Constraints

window_duration

Must be > 0

Python
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta

window = TumblingWindow(
window_duration=timedelta(days=7)
)

Tumbling window example

  • window_duration=timedelta(days=5): This creates pre-determined fixed-length windows of 5 days each. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 5 to Day 9, Window #3 spans Day 10 to Day 14, and so on. Specifically, Window #1 includes all events with timestamps starting at 00:00:00.00 on Day 0 up to (but not including) any events with timestamp 00:00:00.00 on Day 5. Each event belongs to exactly one window.

Sliding window

For features defined using sliding windows, aggregations are computed over a pre-determined fixed-length window that advances by a slide interval, producing overlapping windows. Each event in the source can contribute to feature aggregation for multiple windows. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.

class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta

The following table lists the parameters for a sliding window.

Parameter

Constraints

window_duration

Must be > 0

slide_duration

Must be > 0 and < window_duration

Python
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta

window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)

Sliding window example

  • window_duration=timedelta(days=5), slide_duration=timedelta(days=1): This creates overlapping 5-day windows that advance by 1 day each time. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 1 to Day 5, Window #3 spans Day 2 to Day 6, and so on. Each window includes events from 00:00:00.00 on the start day up to (but not including) 00:00:00.00 on the end day. Because windows overlap, a single event can belong to multiple windows (in this example, each event belongs to up to 5 different windows).

API Methods

create_training_set()

Join features with labeled data for ML training:

Python
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude

# API continues to support creating training set using materialized feature tables and functions
) -> TrainingSet

Call TrainingSet.load_df to get original training data joined with point-in-time dynamically computed features.

Requirements for df argument:

  • Must contain all entity_columns from feature data sources
  • Must contain timeseries_column from feature data sources
  • Should contain label column(s)

Point-in-time correctness: Features are computed with only source data available before each row's timestamp, in order to prevent future data leakage into model training. Computations leverage Spark's windowing functions for efficiency.

log_model()

Log a model with feature metadata for lineage tracking and automatic feature lookup during inference:

Python
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)

The flavor parameter specifies the MLflow model flavor module to use, such as mlflow.sklearn or mlflow.xgboost.

Models logged with a TrainingSet automatically track lineage to the features used in training. For detailed documentation, see Use features to train models.

score_batch()

Perform batch inference with automatic feature lookup:

Python
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame

score_batch uses the feature metadata stored with the model to automatically compute point-in-time correct features for inference, ensuring consistency with training. For detailed documentation, see Use features to train models.

Best practices

Feature naming

  • Use descriptive names for business-critical features.
  • Follow consistent naming conventions across teams.
  • Let auto-generation handle exploratory features.

Time windows

  • Use offsets to exclude unstable recent data.
  • Align window boundaries with business cycles (daily, weekly).
  • Consider data freshness vs. feature stability tradeoffs.

Performance

  • Group features by data source to minimize data scans.
  • Use appropriate window sizes for your use case.

Testing

  • Test time window boundaries with known data scenarios.

Common patterns

Customer analytics

Python
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),

# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),

# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]

Trend analysis

Python
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7))
)

historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)

Seasonal patterns

Python
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)

Limitations

  • Names of entity and timeseries columns must match between the training (labeled) dataset and source tables when used in the create_training_set API.
  • The column name used as the label column in the training dataset should not exist in the source tables used for defining Features.
  • A limited list of functions (UDAFs) is supported in the create_feature API.