Declarative feature engineering and managed pipelines
This feature is Beta and is available in the following regions: us-east-1 and us-west-2.
The feature store declarative APIs enable you to define and compute time-windowed aggregation features from data sources. This guide covers the following workflows:
- Feature development workflow
- Use
create_featureto define Unity Catalog feature objects that can be used in model training and serving workflows.
- Use
- Model training workflow
- Use
create_training_setto calculate point-in-time aggregated features for machine learning. This will return a training set object that can return a Spark DataFrame with computed features augmented to the observation dataset for training a model. - Calling
log_modelwith this training set, to save this model in Unity Catalog, along with lineage between feature and model objects. score_batchuses Unity Catalog lineage to use feature definition code to perform point-in-time correct feature aggregations augmented to the inference dataset for model scoring.
- Use
- Feature materialization and serving workflow
- After defining a feature with
create_featureor retrieving it usingget_feature, you can usematerialize_featuresto materialize the feature or set of features to an offline store for efficient reuse, or to an online store for online serving. - Use
create_training_setwith the materialized view to prepare an offline batch training dataset.
- After defining a feature with
For detailed documentation on log_model and score_batch, see Use features to train models.
Requirements
-
A classic compute cluster running Databricks Runtime 17.0 ML or above.
-
You must install the custom Python package. The following lines of code must be executed each time a notebook is run:
Python%pip install databricks-feature-engineering>=0.14.0
dbutils.library.restartPython()
Quickstart example
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, TumblingWindow, SlidingWindow, OfflineStoreConfig
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
entity_columns=["user_id"],
timeseries_column="transaction_time"
)
# 2. Define features
fe = FeatureEngineeringClient()
features = [
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="avg_transaction_30d",
source=source,
inputs=["amount"],
function=Avg(),
time_window=TumblingWindow(window_duration=timedelta(days=30))
),
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
source=source,
inputs=["amount"],
function=Sum(),
time_window=SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))
# name auto-generated: "amount_sum_continuous_7d"
),
]
# 3. Create training set using declarative features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
# It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="target",
)
training_set.load_df().display() # action: joins labeled_df with computed feature
# 4. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 5. (Optional) Materialize features for serving
fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features"
),
pipeline_state="ACTIVE",
cron_schedule="0 0 * * * ?" # Hourly
)
After materializing features, you can serve models using CPU model serving. For details on online serving, see Materialize and serve declarative features.
Data sources
DeltaTableSource
Permitted data types for timeseries_column: TimestampType, DateType. Other integer data types can work but will cause loss in precision for time window aggregates.
The following code shows an example using the main.analytics.user_events table from Unity Catalog:
from databricks.feature_engineering.entities import DeltaTableSource
source = DeltaTableSource(
catalog_name="main", # Catalog name
schema_name="analytics", # Schema name
table_name="user_events", # Table name
entity_columns=["user_id"], # Join keys, used to look up features for an entity
timeseries_column="event_time" # Timestamp for time windows
)
Declarative feature API
create_feature() API
FeatureEngineeringClient.create_feature() provides comprehensive validation and ensures proper feature construction:
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource
inputs: List[str], # Required: List of column names from the source
function: Union[Function, str], # Required: Aggregation function (Sum, Avg, Count, etc.)
time_window: TimeWindow, # Required: TimeWindow for aggregation
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
name: Optional[str], # Optional: Feature name (auto-generated if omitted)
description: Optional[str], # Optional: Feature description
filter_condition: Optional[str], # Optional: SQL WHERE clause to filter source data
) -> Feature
Parameters:
source: The data source used in feature computationinputs: List of column names from the source to use as input for aggregationfunction: The aggregation function (Function instance or string name). See list of supported functions below.time_window: The time window for aggregation (TimeWindow instance or dict with 'duration' and optional 'offset')catalog_name: The catalog name for the featureschema_name: The schema name for the featurename: Optional feature name (auto-generated if omitted)description: Optional description of the featurefilter_condition: Optional SQL WHERE clause to filter source data before aggregation. Example:"status = 'completed'","transaction" = "Credit" AND "amount > 100"
Returns: A validated Feature instance
Raises: ValueError if any validation fails
Auto-generated names
When name is omitted, names follow the pattern: {column}_{function}_{window}. For example:
price_avg_continuous_1h(1-hour average price)transaction_count_continuous_30d_1d(30-day count of transaction with 1d offset from event timestamp)
Supported functions
All functions are applied over an aggregation time-window as described in the time windows section below.
Function | Shorthand | Description | Example use case |
|---|---|---|---|
|
| Total of values | Per user daily app usage in minutes |
|
| Average of values | Mean transaction amount |
|
| Number of records | Number of logins per user |
|
| Minimum value | Lowest heart rate recorded by a wearable device |
|
| Maximum value | Maximum basket size of times per session |
|
| Population standard deviation | Daily transaction amount variability across all customers |
|
| Sample standard deviation | Variability of ad campaign click-through rates |
|
| Population variance | Spread of sensor readings for IoT devices in a factory |
|
| Sample variance | Spread of movie ratings over a sampled group |
|
| Approximate unique count | Distinct count of items purchased |
| N/A | Approximate percentile | p95 response latency |
|
| First value | First login timestamp |
|
| Last value | Most recent purchase amount |
*Functions with parameters use default values when using string shorthand.
The following example shows window-aggregation features defined over the same data source.
from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct
fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
source=source,
inputs=["product_id"],
function=ApproxCountDistinct(relativeSD=0.01),
...
)
Features with filter conditions
The declarative feature APIs also support applying a SQL filter, which is applied as a WHERE clause in aggregations. Filters are useful when working with large source tables that include a superset of data needed for feature computation, and minimizes the need for creating separate views on top of these tables.
from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta
# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=transactions,
inputs=["amount"],
function=Sum(),
time_window=ContinuousWindow(window_duration=timedelta(days=30)),
filter_condition="amount > 100" # Only transactions over $100
)
# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=orders,
inputs=["order_id"],
function=Count(),
time_window=ContinuousWindow(window_duration=timedelta(days=7)),
filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)
Time windows
Feature engineering declarative APIs support three different window types to control lookback behavior for time window-based aggregations: continuous, tumbling, and sliding.
- Continuous windows look back from the event time. Duration and offset are explicitly defined.
- Tumbling windows are fixed, non-overlapping time windows. Each data point belongs to exactly one window.
- Sliding windows are overlapping, rolling time windows with a configurable slide interval.
The following illustration shows how they work.

Continuous window
Continuous windows are up-to-date and real-time aggregates, typically used over streaming data. In streaming pipelines, the continuous window emits a new row only when the contents of the fixed-length window change, such as when an event enters or leaves. When a continuous window feature is used in training pipelines, an accurate point-in-time feature calculation is performed on the source data using the fixed-length window duration immediately preceding a specific event's timestamp. This helps prevent online-offline skew or data leakage. Features at time T aggregate events from [T − duration, T).
class ContinuousWindow(TimeWindow):
window_duration: datetime.timedelta
offset: Optional[datetime.timedelta] = None
The following table lists the parameters for a continuous window. The window start and end times are based on these parameters as follows:
- Start time:
evaluation_time - window_duration + offset(inclusive) - End time:
evaluation_time + offset(exclusive)
Parameter | Constraints |
|---|---|
| Must be ≤ 0 (moves window backward in time from the end timestamp). Use |
| Must be > 0 |
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta
# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))
Define a continuous window with offset using code below.
# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
window_duration=timedelta(days=7),
offset=timedelta(days=-1)
)
Continuous window examples
-
window_duration=timedelta(days=7), offset=timedelta(days=0): This creates a 7-day lookback window ending at the current evaluation time. For an event at 2:00 PM on Day 7, this includes all events from 2:00 PM on Day 0 up to (but not including) 2:00 PM on Day 7. -
window_duration=timedelta(hours=1), offset=timedelta(minutes=-30): This creates a 1-hour lookback window ending 30 minutes before the evaluation time. For an event at 3:00 PM, this includes all events from 1:30 PM up to (but not including) 2:30 PM. This is useful to account for data ingestion delays.
Tumbling window
For features defined using tumbling windows, aggregations are computed over a pre-determined fixed-length window that advances by a slide interval, producing non-overlapping windows that fully partition time. As a result, each event in the source contributes to exactly one window. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta
The following table lists the parameters for a tumbling window.
Parameter | Constraints |
|---|---|
| Must be > 0 |
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta
window = TumblingWindow(
window_duration=timedelta(days=7)
)
Tumbling window example
window_duration=timedelta(days=5): This creates pre-determined fixed-length windows of 5 days each. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 5 to Day 9, Window #3 spans Day 10 to Day 14, and so on. Specifically, Window #1 includes all events with timestamps starting at00:00:00.00on Day 0 up to (but not including) any events with timestamp00:00:00.00on Day 5. Each event belongs to exactly one window.
Sliding window
For features defined using sliding windows, aggregations are computed over a pre-determined fixed-length window that advances by a slide interval, producing overlapping windows. Each event in the source can contribute to feature aggregation for multiple windows. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta
The following table lists the parameters for a sliding window.
Parameter | Constraints |
|---|---|
| Must be > 0 |
| Must be > 0 and < |
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta
window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)
Sliding window example
window_duration=timedelta(days=5), slide_duration=timedelta(days=1): This creates overlapping 5-day windows that advance by 1 day each time. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 1 to Day 5, Window #3 spans Day 2 to Day 6, and so on. Each window includes events from00:00:00.00on the start day up to (but not including)00:00:00.00on the end day. Because windows overlap, a single event can belong to multiple windows (in this example, each event belongs to up to 5 different windows).
API Methods
create_training_set()
Join features with labeled data for ML training:
FeatureEngineeringClient.create_training_set(
df: DataFrame, # DataFrame with training data
features: Optional[List[Feature]], # List of Feature objects
label: Union[str, List[str], None], # Label column name(s)
exclude_columns: Optional[List[str]] = None, # Optional: columns to exclude
# API continues to support creating training set using materialized feature tables and functions
) -> TrainingSet
Call TrainingSet.load_df to get original training data joined with point-in-time dynamically computed features.
Requirements for df argument:
- Must contain all
entity_columnsfrom feature data sources - Must contain
timeseries_columnfrom feature data sources - Should contain label column(s)
Point-in-time correctness: Features are computed with only source data available before each row's timestamp, in order to prevent future data leakage into model training. Computations leverage Spark's windowing functions for efficiency.
log_model()
Log a model with feature metadata for lineage tracking and automatic feature lookup during inference:
FeatureEngineeringClient.log_model(
model, # Trained model object
artifact_path: str, # Path to store model artifact
flavor: ModuleType, # MLflow flavor module (e.g., mlflow.sklearn)
training_set: TrainingSet, # TrainingSet used for training
registered_model_name: Optional[str], # Optional: register model in Unity Catalog
)
The flavor parameter specifies the MLflow model flavor module to use, such as mlflow.sklearn or mlflow.xgboost.
Models logged with a TrainingSet automatically track lineage to the features used in training. For detailed documentation, see Use features to train models.
score_batch()
Perform batch inference with automatic feature lookup:
FeatureEngineeringClient.score_batch(
model_uri: str, # URI of logged model
df: DataFrame, # DataFrame with entity keys and timestamps
) -> DataFrame
score_batch uses the feature metadata stored with the model to automatically compute point-in-time correct features for inference, ensuring consistency with training. For detailed documentation, see Use features to train models.
Best practices
Feature naming
- Use descriptive names for business-critical features.
- Follow consistent naming conventions across teams.
- Let auto-generation handle exploratory features.
Time windows
- Use offsets to exclude unstable recent data.
- Align window boundaries with business cycles (daily, weekly).
- Consider data freshness vs. feature stability tradeoffs.
Performance
- Group features by data source to minimize data scans.
- Use appropriate window sizes for your use case.
Testing
- Test time window boundaries with known data scenarios.
Common patterns
Customer analytics
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]
Trend analysis
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7))
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)
Seasonal patterns
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)
Limitations
- Names of entity and timeseries columns must match between the training (labeled) dataset and source tables when used in the
create_training_setAPI. - The column name used as the
labelcolumn in the training dataset should not exist in the source tables used for definingFeatures. - A limited list of functions (UDAFs) is supported in the
create_featureAPI.