Skip to main content

REPLACE WHERE flows

Beta

REPLACE WHERE flows are in Beta.

This page describes how to use REPLACE WHERE flows in Lakeflow Spark Declarative Pipelines to recompute and overwrite a targeted subset of a table without reprocessing your entire table history. REPLACE WHERE flows handle late-arriving data, upstream reprocessing, schema evolution, and backfills.

With a REPLACE WHERE flow, you define a predicate on the target table. All rows matching the predicate are deleted and replaced by re-evaluating the source query for that same predicate range. Rows that don't match the predicate are left untouched.

Requirements

  • Your pipeline must use the PREVIEW channel.
  • Databricks recommends Unity Catalog and serverless compute for the best experience.

When to use REPLACE WHERE flows

REPLACE WHERE flows are well-suited for the following scenarios:

  • Incremental batch processing without streaming semantics: Process new rows in batches without streaming concepts such as watermarks.
  • Selective reprocessing: Recompute only rows that match a predicate while leaving all other rows untouched.
  • Scenarios beyond standard materialized view capabilities:
    • Target tables with longer retention than the source
    • Preventing recomputation when a dimension table changes
    • Schema evolution without recomputing entire history

Predicate design guidelines

Avoid REPLACE WHERE predicates on aggregates or derived columns. For example, a predicate like total_sales > 100000 where total_sales is a SUM() requires the engine to recompute the aggregation for all partitions on every run. Use predicates on base columns such as date or region so the engine can push down the filter to the source and process only the relevant data.

Create a REPLACE WHERE flow

You can define REPLACE WHERE flows in both SQL and Python.

Use the FLOW REPLACE WHERE clause inline with CREATE STREAMING TABLE:

SQL
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

You can also use the long-form CREATE FLOW syntax:

SQL
CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_timestamp(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

In these examples, all rows from the last 7 days are deleted from orders_enriched and recomputed from the source query. You don't need to add the predicate to the source query — the pipeline engine automatically applies it when reading from the source.

note

BY NAME is required in SQL. It matches columns by name rather than position.

Perform a backfill with predicate overrides

You can override the REPLACE WHERE predicate for a single pipeline update without modifying the pipeline definition. Predicate overrides are one-time, apply only to the current update, and don't affect future runs.

Example: Initial historical load

To perform a one-time backfill of historical data when first setting up a pipeline:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
note

The historical data must come from the same source as the incremental data. If you have historical data from a different source, use DML statements directly on the target table. See Backfill with DML statements.

Example: Correct a column for a specific period

After updating a column definition, backfill the change for a targeted historical range:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30)",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)

You can also combine multiple dimensions in the predicate override:

Python
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30) AND region = 'asia'",
}
]

Helper function: start_update_with_replace_where

Use the pipeline update API from a notebook to submit predicate overrides:

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()

body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}

if refresh_selection:
body["refresh_selection"] = refresh_selection

res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)

return StartUpdateResponse.from_dict(res)

Backfill with DML statements

You can run DML statements directly on the target table from outside the pipeline to perform initial loads or corrections, such as loading from a legacy table:

SQL
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Rows inserted through DML are not subject to the REPLACE WHERE predicate and persist across scheduled refreshes unless they fall within the predicate range of a future run.

Full refresh behavior

warning

A full refresh clears all existing data and re-executes the flow using only its defined predicate. If a pipeline has been running for a year with a 7-day predicate, a full refresh results in the table containing only the last 7 days of data. All older rows are permanently deleted.

To prevent full refreshes on a table, set the table property pipelines.reset.allowed to false. See Pipeline properties reference.

Limitations

  • The target table must be created within the pipeline.
  • Only one REPLACE WHERE flow is allowed per target table.
  • A table targeted by a REPLACE WHERE flow can't also be targeted by another flow type, such as an AUTO CDC flow or an append flow.
  • Expectations are not supported on tables targeted by REPLACE WHERE flows.
  • For streaming tables created in Databricks SQL, see REPLACE WHERE flows in Databricks SQL for syntax and backfill differences.

Examples

Keep historical aggregates from a limited-retention source

This example maintains daily aggregates indefinitely, even after raw data ages out of the source table (3-day retention):

SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Prevent recomputation when a dimension table changes

This example keeps historical fact rows unchanged when dimension attributes change:

SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;

If a user's region changes, only recent rows are recomputed. Historical rows retain the region value at the time they were written. You can correct historical rows later with a targeted backfill using predicate overrides.

Add a new metric without recomputing full history

This example shows how to evolve a table definition and backfill only a targeted range:

  1. Define the initial table:

    SQL
    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
  2. Update the query to add uniq_users:

    SQL
    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks,
    COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
  3. Backfill the new metric for the last 30 days:

    Python
    overrides = [
    {
    "flow_name": "clickstream_daily",
    "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
    }
    ]

    resp = start_update_with_replace_where(
    pipeline_id="<pipeline-id>",
    replace_where_overrides=overrides,
    refresh_selection=["clickstream_daily"],
    )

    Rows older than the backfilled range contain NULL for uniq_users.