Declarative features
This feature is Beta and is available in the following regions: us-east-1 and us-west-2.
The Declarative Feature Engineering APIs enable you to define and compute time-windowed aggregation features from data sources. This guide covers the following workflows:
- Feature development workflow
- Use
create_featureto define Unity Catalog feature objects that can be used in model training and serving workflows.
- Use
- Model training workflow
- Use
create_training_setto calculate point-in-time aggregated features for machine learning. For detailed documentation on training with declarative features, see Train models with declarative features.
- Use
- Feature materialization and serving workflow
- After defining a feature with
create_featureor retrieving it usingget_feature, you can usematerialize_featuresto materialize the feature or set of features to an offline store for efficient reuse, or to an online store for online serving. - Use
create_training_setwith the materialized view to prepare an offline batch training dataset.
- After defining a feature with
Requirements
-
A classic compute cluster running Databricks Runtime 17.0 ML or above.
-
You must install the custom Python package. The following lines of code must be executed each time a notebook is run:
Python%pip install databricks-feature-engineering>=0.14.0
dbutils.library.restartPython()
Quickstart example
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import DeltaTableSource, Sum, Avg, ContinuousWindow, OfflineStoreConfig
from datetime import timedelta
CATALOG_NAME = "main"
SCHEMA_NAME = "feature_store"
TABLE_NAME = "transactions"
# 1. Create data source
source = DeltaTableSource(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name=TABLE_NAME,
entity_columns=["user_id"],
timeseries_column="transaction_time"
)
# 2. Define features
fe = FeatureEngineeringClient()
features = [
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
name="avg_transaction_30d",
source=source,
inputs=["amount"],
function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=30))
),
fe.create_feature(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
source=source,
inputs=["amount"],
function=Sum(),
time_window=SlidingWindow(window_duration=timedelta(days=7), slide_duration=timedelta(days=1))
# name auto-generated: "amount_sum_continuous_7d"
),
]
# 3. Create training set using declarative features
# `labeled_df` should have columns "user_id", "transaction_time", and "target".
# It can have other context features specific to the individual observations.
training_set = fe.create_training_set(
df=labeled_df,
features=features,
label="target",
)
training_set.load_df().display() # action: joins labeled_df with computed feature
# 4. Train model
with mlflow.start_run():
training_df = training_set.load_df()
# training code
fe.log_model(
model=model,
artifact_path="recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.recommendation_model",
)
# 5. (Optional) Materialize features for serving
fe.materialize_features(
features=features,
offline_config=OfflineStoreConfig(
catalog_name=CATALOG_NAME,
schema_name=SCHEMA_NAME,
table_name_prefix="customer_features"
),
pipeline_state="ACTIVE",
cron_schedule="0 0 * * * ?" # Hourly
)
After materializing features, you can serve models using CPU model serving. For details, see Materialize declarative features.
Data sources
DeltaTableSource
DeltaTableSource is an ephemeral Python object used to define how features are computed from a source table. It does not create a new table—it specifies the configuration for reading data and aggregating features.
Permitted data types for timeseries_column: TimestampType, DateType. Other integer data types can work but will cause loss in precision for time window aggregates.
Understanding entities
The entity_columns parameter specifies the columns that define the level of aggregation for your features. Entities determine:
- How data is grouped: Features are aggregated per unique combination of entity values (similar to
GROUP BYin SQL) - The primary key structure: Each unique entity combination results in one row of computed features
Example: Customer-level features
The following code aggregates features at the customer level (one row per customer):
from databricks.feature_engineering.entities import DeltaTableSource
source = DeltaTableSource(
catalog_name="main", # Catalog name
schema_name="analytics", # Schema name
table_name="user_events", # Table name
entity_columns=["user_id"], # Features aggregated per user
timeseries_column="event_time" # Timestamp for time windows
)
Example: Customer-Store-level features
To aggregate features at a more detailed level (one row per customer-store combination), use multiple entity columns:
source = DeltaTableSource(
catalog_name="main",
schema_name="retail",
table_name="transactions",
entity_columns=["user_id", "store_id"], # Features aggregated per user-store pair
timeseries_column="transaction_time"
)
When you need features at different levels of aggregation (for example, customer-level and customer-store-level), create separate DeltaTableSource objects with different entity_columns configurations, even if they reference the same underlying table.
Declarative Feature Engineering API
create_feature() API
FeatureEngineeringClient.create_feature() provides comprehensive validation and ensures proper feature construction:
FeatureEngineeringClient.create_feature(
source: DataSource, # Required: DeltaTableSource
inputs: List[str], # Required: List of column names from the source
function: Union[Function, str], # Required: Aggregation function (Sum, Avg, Count, etc.)
time_window: TimeWindow, # Required: TimeWindow for aggregation
catalog_name: str, # Required: The catalog name for the feature
schema_name: str, # Required: The schema name for the feature
name: Optional[str], # Optional: Feature name (auto-generated if omitted)
description: Optional[str], # Optional: Feature description
filter_condition: Optional[str], # Optional: SQL WHERE clause to filter source data
) -> Feature
Parameters:
source: The data source used in feature computationinputs: List of column names from the source to use as input for aggregationfunction: The aggregation function (Function instance or string name). See list of supported functions below.time_window: The time window for aggregation (TimeWindow instance or dict with 'duration' and optional 'offset')catalog_name: The catalog name for the featureschema_name: The schema name for the featurename: Optional feature name (auto-generated if omitted)description: Optional description of the featurefilter_condition: Optional SQL WHERE clause to filter source data before aggregation. Example:"status = 'completed'","transaction" = "Credit" AND "amount > 100"
Returns: A validated Feature instance
Raises: ValueError if any validation fails
Auto-generated names
When name is omitted, names follow the pattern: {column}_{function}_{window}. For example:
price_avg_continuous_1h(1-hour average price)transaction_count_continuous_30d_1d(30-day count of transaction with 1d offset from event timestamp)
Supported functions
All functions are applied over an aggregation time-window as described in the time windows section below.
Function | Shorthand | Description | Example use case |
|---|---|---|---|
|
| Total of values | Per user daily app usage in minutes |
|
| Average of values | Mean transaction amount |
|
| Number of records | Number of logins per user |
|
| Minimum value | Lowest heart rate recorded by a wearable device |
|
| Maximum value | Maximum basket size of times per session |
|
| Population standard deviation | Daily transaction amount variability across all customers |
|
| Sample standard deviation | Variability of ad campaign click-through rates |
|
| Population variance | Spread of sensor readings for IoT devices in a factory |
|
| Sample variance | Spread of movie ratings over a sampled group |
|
| Approximate unique count | Distinct count of items purchased |
| N/A | Approximate percentile | p95 response latency |
|
| First value | First login timestamp |
|
| Last value | Most recent purchase amount |
*Functions with parameters use default values when using string shorthand.
The following example shows window-aggregation features defined over the same data source.
from databricks.feature_engineering.entities import Sum, Avg, Count, Max, ApproxCountDistinct
fe = FeatureEngineeringClient()
sum_feature = fe.create_feature(source=source, inputs=["amount"], function=Sum(), ...)
avg_feature = fe.create_feature(source=source, inputs=["amount"], function=Avg(), ...)
distinct_count = fe.create_feature(
source=source,
inputs=["product_id"],
function=ApproxCountDistinct(relativeSD=0.01),
...
)
Features with filter conditions
The filter_condition parameter allows you to filter rows from the source table before computing aggregations. This functions as a SQL WHERE clause that is applied prior to grouping and aggregating data.
filter_condition filters rows before aggregation, like a SQL WHERE clause applied before GROUP BY. It does not change the aggregation level, which is always defined by entity_columns.
The Declarative Feature Engineering APIs support applying a SQL filter, which is applied as a WHERE clause in aggregations. Filters are useful when working with large source tables that include a superset of data needed for feature computation, and minimize the need for creating separate views on top of these tables.
from databricks.feature_engineering.entities import Sum, Count, ContinuousWindow
from datetime import timedelta
# Only aggregate high-value transactions
high_value_sales = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=transactions,
inputs=["amount"],
function=Sum(),
time_window=ContinuousWindow(window_duration=timedelta(days=30)),
filter_condition="amount > 100" # Only transactions over $100
)
# Multiple conditions using SQL syntax
completed_orders = fe.create_feature(
catalog_name="main",
schema_name="ecommerce",
source=orders,
inputs=["order_id"],
function=Count(),
time_window=ContinuousWindow(window_duration=timedelta(days=7)),
filter_condition="status = 'completed' AND payment_method = 'credit_card'"
)
Time windows
Declarative Feature Engineering APIs support three different window types to control lookback behavior for time window-based aggregations: continuous, tumbling, and sliding.
- Continuous windows look back from the event time. Duration and offset are explicitly defined.
- Tumbling windows are fixed, non-overlapping time windows. Each data point belongs to exactly one window.
- Sliding windows are overlapping, rolling time windows with a configurable slide interval.
The following illustration shows how they work.

Continuous window
Continuous windows are up-to-date and real-time aggregates, typically used over streaming data. In streaming pipelines, the continuous window emits a new row only when the contents of the fixed-length window change, such as when an event enters or leaves. When a continuous window feature is used in training pipelines, an accurate point-in-time feature calculation is performed on the source data using the fixed-length window duration immediately preceding a specific event's timestamp. This helps prevent online-offline skew or data leakage. Features at time T aggregate events from [T − duration, T).
class ContinuousWindow(TimeWindow):
window_duration: datetime.timedelta
offset: Optional[datetime.timedelta] = None
The following table lists the parameters for a continuous window. The window start and end times are based on these parameters as follows:
- Start time:
evaluation_time - window_duration + offset(inclusive) - End time:
evaluation_time + offset(exclusive)
Parameter | Constraints |
|---|---|
| Must be ≤ 0 (moves window backward in time from the end timestamp). Use |
| Must be > 0 |
from databricks.feature_engineering.entities import ContinuousWindow
from datetime import timedelta
# Look back 7 days from evaluation time
window = ContinuousWindow(window_duration=timedelta(days=7))
Define a continuous window with offset using code below.
# Look back 7 days, but end 1 day ago (exclude most recent day)
window = ContinuousWindow(
window_duration=timedelta(days=7),
offset=timedelta(days=-1)
)
Continuous window examples
-
window_duration=timedelta(days=7), offset=timedelta(days=0): This creates a 7-day lookback window ending at the current evaluation time. For an event at 2:00 PM on Day 7, this includes all events from 2:00 PM on Day 0 up to (but not including) 2:00 PM on Day 7. -
window_duration=timedelta(hours=1), offset=timedelta(minutes=-30): This creates a 1-hour lookback window ending 30 minutes before the evaluation time. For an event at 3:00 PM, this includes all events from 1:30 PM up to (but not including) 2:30 PM. This is useful to account for data ingestion delays.
Tumbling window
For features defined using tumbling windows, aggregations are computed over pre-determined, non-overlapping, fixed-length windows that fully partition time. Each window advances by the window_duration, so there are no gaps or overlaps between consecutive windows. As a result, each event in the source contributes to exactly one window. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.
class TumblingWindow(TimeWindow):
window_duration: datetime.timedelta
The following table lists the parameters for a tumbling window.
Parameter | Constraints |
|---|---|
| Must be > 0 |
from databricks.feature_engineering.entities import TumblingWindow
from datetime import timedelta
window = TumblingWindow(
window_duration=timedelta(days=7)
)
Tumbling window example
window_duration=timedelta(days=5): This creates pre-determined fixed-length windows of 5 days each. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 5 to Day 9, Window #3 spans Day 10 to Day 14, and so on. Specifically, Window #1 includes all events with timestamps starting at00:00:00.00on Day 0 up to (but not including) any events with timestamp00:00:00.00on Day 5. Each event belongs to exactly one window.
Sliding window
For features defined using sliding windows, aggregations are computed over a pre-determined fixed-length window that advances by a slide interval, producing overlapping windows. Each event in the source can contribute to feature aggregation for multiple windows. Features at time t aggregate data from windows ending at or before t (exclusive). Windows start at the Unix epoch.
class SlidingWindow(TimeWindow):
window_duration: datetime.timedelta
slide_duration: datetime.timedelta
The following table lists the parameters for a sliding window.
Parameter | Constraints |
|---|---|
| Must be > 0 |
| Must be > 0 and < |
from databricks.feature_engineering.entities import SlidingWindow
from datetime import timedelta
window = SlidingWindow(
window_duration=timedelta(days=7),
slide_duration=timedelta(days=1)
)
Sliding window example
window_duration=timedelta(days=5), slide_duration=timedelta(days=1): This creates overlapping 5-day windows that advance by 1 day each time. Example: Window #1 spans Day 0 to Day 4, Window #2 spans Day 1 to Day 5, Window #3 spans Day 2 to Day 6, and so on. Each window includes events from00:00:00.00on the start day up to (but not including)00:00:00.00on the end day. Because windows overlap, a single event can belong to multiple windows (in this example, each event belongs to up to 5 different windows).
Feature materialization
After you define features, you can materialize them to offline or online stores for efficient reuse in training and serving workflows. For details, see Materialize declarative features.
Model training and inference
To train models and run batch inference with declarative features, including log_model(), score_batch(), and create_training_set(), see Train models with declarative features.
Best practices
Feature naming
- Use descriptive names for business-critical features.
- Follow consistent naming conventions across teams.
- Let auto-generation handle exploratory features.
Time windows
- Use offsets to exclude unstable recent data.
- Align window boundaries with business cycles (daily, weekly).
- Consider data freshness vs. feature stability tradeoffs.
Performance
- Group features by data source to minimize data scans.
- Use appropriate window sizes for your use case.
Testing
- Test time window boundaries with known data scenarios.
Entity columns vs. filter conditions
Use this decision guide when working with features from the same source table:
Use entity_columns when you need different aggregation levels:
- Customer-level features (one row per customer):
entity_columns=["customer_id"] - Customer-merchant features (multiple rows per customer):
entity_columns=["customer_id", "merchant_id"] - Different aggregation levels require separate
DeltaTableSourceobjects, even if they reference the same underlying table
Use filter_condition when you need to filter rows at the same aggregation level:
- High-value transactions only:
filter_condition="amount > 100"(still aggregated per customer) - Completed orders only:
filter_condition="status = 'completed'"(still aggregated per customer)
Rule of thumb: If your change would result in a different number of rows per entity value, you need separate sources with different entity_columns. If you're just filtering which rows contribute to the same aggregation, use filter_condition.
Common patterns
Customer analytics
fe = FeatureEngineeringClient()
features = [
# Recency: Number of transactions in the last day
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=1))),
# Frequency: transaction count over the last 90 days
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["transaction_id"],
function=Count(), time_window=ContinuousWindow(window_duration=timedelta(days=90))),
# Monetary: total spend in the last month
fe.create_feature(catalog_name="main", schema_name="ecommerce", source=transactions, inputs=["amount"],
function=Sum(), time_window=ContinuousWindow(window_duration=timedelta(days=30)))
]
Trend analysis
# Compare recent vs. historical behavior
fe = FeatureEngineeringClient()
recent_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7))
)
historical_avg = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=7), offset=timedelta(days=-7))
)
Seasonal patterns
# Same day of week, 4 weeks ago
fe = FeatureEngineeringClient()
weekly_pattern = fe.create_feature(
catalog_name="main", schema_name="ecommerce",
source=transactions, inputs=["amount"], function=Avg(),
time_window=ContinuousWindow(window_duration=timedelta(days=1), offset=timedelta(weeks=-4))
)
Limitations
- Continuous features cannot be materialized. Due to their high fidelity of time correctness, continuous features for offline training or batch inference are generated on the fly for each data point.
- Names of entity and timeseries columns must match between the training (labeled) dataset and source tables when used in the
create_training_setAPI. - The column name used as the
labelcolumn in the training dataset should not exist in the source tables used for definingFeatures. - A limited list of functions (UDAFs) is supported in the
create_featureAPI. - Continuous features cannot be materialized.
- You can only work with materialized features in the workspace in which they were created.
- Deleting and pausing a feature must be manually managed at the pipeline level.