REPLACE WHERE flows
REPLACE WHERE flows are in Beta.
This page describes how to use REPLACE WHERE flows in Lakeflow Spark Declarative Pipelines to recompute and overwrite a targeted subset of a table without reprocessing your entire table history. REPLACE WHERE flows handle late-arriving data, upstream reprocessing, schema evolution, and backfills.
With a REPLACE WHERE flow, you define a predicate on the target table. All rows matching the predicate are deleted and replaced by re-evaluating the source query for that same predicate range. Rows that don't match the predicate are left untouched.
Requirements
- Your pipeline must use the
PREVIEWchannel. - Databricks recommends Unity Catalog and serverless compute for the best experience.
When to use REPLACE WHERE flows
REPLACE WHERE flows are well-suited for the following scenarios:
- Incremental batch processing without streaming semantics: Process new rows in batches without streaming concepts such as watermarks.
- Selective reprocessing: Recompute only rows that match a predicate while leaving all other rows untouched.
- Scenarios beyond standard materialized view capabilities:
- Target tables with longer retention than the source
- Preventing recomputation when a dimension table changes
- Schema evolution without recomputing entire history
Predicate design guidelines
Avoid REPLACE WHERE predicates on aggregates or derived columns. For example, a predicate like total_sales > 100000 where total_sales is a SUM() requires the engine to recompute the aggregation for all partitions on every run. Use predicates on base columns such as date or region so the engine can push down the filter to the source and process only the relevant data.
Create a REPLACE WHERE flow
You can define REPLACE WHERE flows in both SQL and Python.
- SQL
- Python
Use the FLOW REPLACE WHERE clause inline with CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
You can also use the long-form CREATE FLOW syntax:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_timestamp(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
In Python, the table and the flow are defined in a single statement. The flow inherits the same name as the table:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
The replace_where parameter accepts either a PySpark column expression or a string predicate.
In these examples, all rows from the last 7 days are deleted from orders_enriched and recomputed from the source query. You don't need to add the predicate to the source query — the pipeline engine automatically applies it when reading from the source.
BY NAME is required in SQL. It matches columns by name rather than position.
Perform a backfill with predicate overrides
You can override the REPLACE WHERE predicate for a single pipeline update without modifying the pipeline definition. Predicate overrides are one-time, apply only to the current update, and don't affect future runs.
Example: Initial historical load
To perform a one-time backfill of historical data when first setting up a pipeline:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
The historical data must come from the same source as the incremental data. If you have historical data from a different source, use DML statements directly on the target table. See Backfill with DML statements.
Example: Correct a column for a specific period
After updating a column definition, backfill the change for a targeted historical range:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
You can also combine multiple dimensions in the predicate override:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30) AND region = 'asia'",
}
]
Helper function: start_update_with_replace_where
start_update_with_replace_whereUse the pipeline update API from a notebook to submit predicate overrides:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Backfill with DML statements
You can run DML statements directly on the target table from outside the pipeline to perform initial loads or corrections, such as loading from a legacy table:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Rows inserted through DML are not subject to the REPLACE WHERE predicate and persist across scheduled refreshes unless they fall within the predicate range of a future run.
Full refresh behavior
A full refresh clears all existing data and re-executes the flow using only its defined predicate. If a pipeline has been running for a year with a 7-day predicate, a full refresh results in the table containing only the last 7 days of data. All older rows are permanently deleted.
To prevent full refreshes on a table, set the table property pipelines.reset.allowed to false. See Pipeline properties reference.
Limitations
- The target table must be created within the pipeline.
- Only one REPLACE WHERE flow is allowed per target table.
- A table targeted by a REPLACE WHERE flow can't also be targeted by another flow type, such as an AUTO CDC flow or an append flow.
- Expectations are not supported on tables targeted by REPLACE WHERE flows.
- For streaming tables created in Databricks SQL, see REPLACE WHERE flows in Databricks SQL for syntax and backfill differences.
Examples
Keep historical aggregates from a limited-retention source
This example maintains daily aggregates indefinitely, even after raw data ages out of the source table (3-day retention):
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Prevent recomputation when a dimension table changes
This example keeps historical fact rows unchanged when dimension attributes change:
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
If a user's region changes, only recent rows are recomputed. Historical rows retain the region value at the time they were written. You can correct historical rows later with a targeted backfill using predicate overrides.
Add a new metric without recomputing full history
This example shows how to evolve a table definition and backfill only a targeted range:
-
Define the initial table:
SQLCREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL; -
Update the query to add
uniq_users:SQLCREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL; -
Backfill the new metric for the last 30 days:
Pythonoverrides = [
{
"flow_name": "clickstream_daily",
"predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["clickstream_daily"],
)Rows older than the backfilled range contain
NULLforuniq_users.