Skip to main content

Best practices for Lakeflow Spark Declarative Pipelines

This page describes recommended patterns for designing, building, and operating pipelines with Lakeflow Spark Declarative Pipelines. Apply these guidelines when starting a new pipeline or improving an existing one.

Choose the right dataset type

Lakeflow Spark Declarative Pipelines offers three dataset types: streaming tables, materialized views, and temporary views. Choosing the right type for each layer of your pipeline avoids unnecessary compute costs and keeps your code easy to reason about.

Streaming tables are the right choice for data ingestion and low-latency streaming transformations. Each input row is read and processed only once, which makes them ideal for append-only workloads, high-volume data, and event-driven processing from cloud storage or message buses.

Materialized views are the right choice for complex transformations and analytical queries. Their results are pre-computed and kept up to date using incremental refresh, so queries against them are fast. You can't directly modify the data in a materialized view — the query definition controls the output.

Temporary views are pipeline-scoped views that organize your transformation logic without materializing any data to storage. Use them for intermediate steps that don't need their own table.

The following table summarizes when to use each type:

Use case

Recommended type

Reason

Ingestion from cloud storage or a message bus

Streaming table

Processes each record once; handles high volume and append-only workloads.

CDC streams (inserts, updates, deletes)

Streaming table

Used as the target of APPLY CHANGES INTO for ordered, deduplicated CDC ingestion.

Complex aggregations and joins

Materialized view

Incrementally refreshed; avoids full recomputation on each update.

Dashboard query acceleration

Materialized view

Pre-computed results make queries faster than against raw tables.

Intermediate transformations (no downstream readers)

Temporary view

Organizes pipeline logic without incurring storage cost.

For more information, see Streaming tables, Materialized views, and Lakeflow Spark Declarative Pipelines concepts.

Use declarative CDC instead of imperative MERGE

Implementing change data capture (CDC) with imperative SQL MERGE statements requires significant custom code to handle event ordering, deduplication, partial updates, and schema evolution correctly. Each of these concerns must be solved independently, and the resulting code is difficult to maintain and test.

Lakeflow Spark Declarative Pipelines provides the APPLY CHANGES INTO statement (SQL) and apply_changes() function (Python), which handle ordering, deduplication, out-of-order events, and schema evolution declaratively. You describe the shape of the change feed and the target table — the pipeline handles the rest. APPLY CHANGES INTO supports both SCD Type 1 (overwrite) and SCD Type 2 (history preservation).

For more information, see What is change data capture (CDC)? and The AUTO CDC APIs: Simplify change data capture with pipelines.

Enforce data quality with expectations

Expectations are true/false SQL expressions applied to every row passing through a dataset. When a row fails the condition, the pipeline responds according to the violation policy you've configured. Expectations emit metrics to the pipeline event log regardless of the policy, so you can track data quality trends over time.

Choose a violation policy

Three violation policies are available. Pick the one that matches your tolerance for bad data:

  • warn (default): Records that are not valid are written to the target table and flagged in metrics. Use this policy when you need to capture all data but want visibility into quality issues.
  • drop: Records that are not valid are discarded before writing. Use this when bad rows are expected and shouldn't propagate downstream.
  • fail: The pipeline update stops on the first invalid record. Use this for critical data where any bad record indicates a serious upstream problem.

The following examples show each policy applied to a streaming table:

SQL
-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Quarantine invalid records

When you want to preserve dropped records for investigation rather than silently discarding them, use a quarantine pattern. Route rows that fail validation to a separate streaming table by using two flows: one that drops invalid rows from the main table and a second that writes only the invalid rows to a quarantine table. This lets you investigate, fix, and reprocess bad data without contaminating your clean dataset.

For a detailed example of the quarantine pattern, see Expectation recommendations and advanced patterns.

For more information about expectations, see Manage data quality with pipeline expectations.

Parameterize your pipelines

Pipelines have default catalog and schema settings, so code that reads and writes within the same catalog and schema works across environments without any parameters. However, if your pipeline needs to reference a second catalog or schema — for example, reading from a shared source catalog that differs between development and production — avoid hardcoding those names directly in your source code. Instead, define them as pipeline configuration parameters (key-value pairs set in the pipeline settings) and reference them in your code. This lets a single codebase run correctly across environments by swapping the parameter values.

SQL
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

For more information, see Use parameters with pipelines.

Choose the right pipeline mode for each environment

Development and production update modes

Pipelines run in either development or production update mode. Choose the mode that matches your goal.

In development mode, the pipeline reuses a long-running cluster across updates and doesn't retry on errors. This speeds up the iteration cycle when you're authoring and testing pipeline code because you get error details immediately without waiting for cluster restarts.

In production mode, the cluster shuts down promptly after each update completes, which reduces compute costs. The pipeline also applies escalating retries, including cluster restarts, to handle transient infrastructure failures automatically. Use production mode for all scheduled pipeline runs.

Triggered vs. continuous pipeline mode

Triggered mode processes all available data and then stops. It's the right choice for the vast majority of pipelines: those that run on a schedule (hourly, daily, or on demand) and don't require sub-minute data freshness.

Continuous mode keeps the cluster running and processes new data as it arrives. It's appropriate only when your use case requires latency in the seconds-to-minutes range. Because continuous mode requires an always-on cluster, it's significantly more expensive than triggered mode.

For more information, see Triggered vs. continuous pipeline mode and Configure Pipelines.

Use liquid clustering for data layout

Liquid clustering replaces static partitioning and ZORDER for optimizing data layout in Delta tables. Unlike partitioning, which requires you to choose a partition column up front and can cause data skew when values are unevenly distributed, liquid clustering is self-tuning, skew-resistant, and incremental — only the data that needs reorganization is rewritten on each run.

Change clustering columns at any time without rewriting the full table as query patterns evolve.

Define clustering columns in your streaming table definition:

SQL
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

If you're not sure which columns to cluster by, use CLUSTER BY AUTO to let Databricks select the optimal clustering columns based on your query workload automatically.

For more information, see Streaming tables and Use liquid clustering for tables.

Manage pipelines with CI/CD and Databricks Asset Bundles

Version-control your pipeline source code and use Databricks Asset Bundles to manage deployments across environments.

For more information, see Create a source-controlled pipeline, Convert a pipeline into a Databricks Asset Bundle project, and Use parameters with pipelines.

Store pipeline code in version control

Store all pipeline source files (Python and SQL) alongside your bundle configuration in a Git repository. Version-controlling the full project gives you a complete history of changes, makes collaboration easier, and lets you validate changes in a development environment before promoting them to production.

Databricks recommends Databricks Asset Bundles for managing this workflow. A bundle defines your pipeline configuration in YAML alongside your source code, and the databricks bundle CLI lets you validate, deploy, and run pipelines from your terminal or a CI/CD system.

Use bundle targets for environment isolation

Bundles enable multiple targets (for example, dev, staging, prod), each with its own set of overrides for catalog names, cluster policies, notification addresses, and other settings. Combine bundle targets with pipeline parameters to inject the correct environment-specific values at deploy time, keeping your source code free of environment constants.

A typical workflow looks like this:

  1. A developer works on a feature branch, deploying to a personal development pipeline in a dev catalog.
  2. On merge to the main branch, a CI system runs databricks bundle validate and databricks bundle deploy --target staging to validate and deploy the pipeline to a staging environment.
  3. After testing passes, the CI system deploys to production with databricks bundle deploy --target prod.

Streaming best practices

Use these patterns to manage state, control late data, and keep streaming pipelines reliable.

For more information, see Optimize stateful processing with watermarks, Recover a pipeline from streaming checkpoint failure, and Backfilling historical data with pipelines.

Use watermarks for stateful operations

Watermarks bound the state that the pipeline keeps in memory during stateful streaming operations such as windowed aggregations and deduplication. Without a watermark, state grows unbounded as the pipeline accumulates data for every possible key, eventually causing out-of-memory errors on long-running pipelines.

A watermark specifies a timestamp column and a tolerance threshold for late data. Records that arrive after the threshold has passed are dropped. Choose a threshold that balances your tolerance for late data against the memory cost of keeping that state open.

The following example computes a one-minute tumbling window aggregation with a three-minute watermark:

SQL
CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;
note

To ensure that aggregations are processed incrementally rather than fully recomputed on each update, you must define a watermark.

Understand streaming state and full refresh

Streaming state is incremental: the pipeline builds and maintains state across updates rather than recomputing from scratch each time. This is what makes stateful streaming efficient, but it also means that if you change the logic of a stateful query (for example, modifying a watermark threshold or changing aggregation columns), the existing state is no longer compatible with the new logic. In this case, you must perform a full refresh to reprocess all historical data with the new logic and rebuild the state from scratch.

A full refresh can also lead to data loss if the source does not retain historical data. For example, a Kafka source with a short retention period may only have the last few minutes of data available at the time of the refresh, resulting in a table that contains far less data than before. Plan stateful query logic changes carefully, especially for high-volume streams where a full refresh is expensive or where the source has limited data retention. Using the medallion architecture helps by creating bronze tables with minimal transformation, and allows silver or gold tables to recompute from the bronze tables with full history.

Stream-stream joins

Stream-stream joins require a watermark on both sides of the join and a time-bounded join condition. The time interval in the join condition tells the streaming engine when no further matches are possible, allowing it to evict state that can no longer be matched. If you omit either the watermarks or the time-bound condition, state grows without bound.

The following example joins ad impression events with click events, requiring the click to occur within three minutes of the impression:

SQL
CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
AND clk.click_time BETWEEN imp.impression_time
AND imp.impression_time + INTERVAL 3 MINUTES;

When you join a stream against a static table (a snapshot join), the static table snapshot is refreshed at the start of each microbatch. This means late-arriving dimension records are not retroactively applied to facts that were already processed. If retroactive application is required, use a materialized view or restructure the pipeline.

Optimize pipeline performance

Apply these techniques to reduce compute costs and speed up pipeline updates.

For more information, see Materialized views and Optimize stateful processing with watermarks.

Avoid small files

Triggering a pipeline too frequently on a low-volume source writes a large number of small files to cloud storage. Small files degrade read performance because each file requires a separate metadata lookup and I/O round trip, and cloud storage APIs throttle listing operations at scale. To avoid this, choose a trigger interval that matches your data volume: run triggered pipelines on a schedule that allows a meaningful amount of data to accumulate between updates, rather than continuously.

Handle data skew

Data skew occurs when values in a join or groupBy key are unevenly distributed across partitions, causing a small number of tasks to process the majority of the data. This creates hotspots that increase end-to-end update time. Use liquid clustering to address skew in stored tables. For skew that occurs during in-flight computation, salt highly-skewed keys by appending a random bucket suffix before grouping and aggregating in two stages.

For more information, see Use liquid clustering for data layout.

Use incremental refresh for materialized views

When you use a materialized view for a large aggregation, Lakeflow Spark Declarative Pipelines attempts to incrementally refresh it — processing only the upstream changes since the last update rather than recomputing the full result set. Incremental refresh is significantly cheaper than rerunning the query from scratch on each pipeline trigger. To maximize the chance that a materialized view can be refreshed incrementally, write simple, deterministic aggregation queries and avoid constructs that prevent incremental processing, such as non-deterministic functions.

See Incremental refresh for materialized views.

Optimize joins

For joins where one side is a small dimension table, add a broadcast hint to instruct Spark to broadcast the smaller table to all executors instead of performing a shuffle join:

SQL
CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

For time-series proximity joins (for example, finding the nearest event within a time range), use a range join condition and ensure both sides have a watermark if joining streams, or consider pre-binning events into time buckets before joining.

Monitor your pipelines

The pipeline event log is the primary observability primitive in Lakeflow Spark Declarative Pipelines. Every pipeline run writes structured records to the event log covering execution progress, data quality expectation results, data lineage, and error details. The event log is a Delta table that you can query directly.

To query the event log without knowing the underlying storage path, use the event_log() table-valued function on a shared cluster or SQL warehouse:

SQL
SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Build data quality dashboards by querying the event log for the expectation metrics. The details column contains a nested JSON structure with pass/fail counts for each constraint, which you can use to track quality trends over time and alert on regressions.

For event-driven alerting, use event hooks to trigger custom webhooks or notification services (such as Slack or PagerDuty) when a pipeline fails or when a data quality threshold is breached. Event hooks are Python functions that run in response to pipeline events.

For more information, see Monitor pipelines, Pipeline event log, and Define custom monitoring of pipelines with event hooks.

Use serverless compute

Databricks recommends serverless compute for new pipelines. With serverless, there's no manual cluster configuration — Databricks manages the infrastructure automatically. Serverless pipelines use enhanced autoscaling that can scale both horizontally (more executors) and vertically (larger executor size) in response to workload demands. Serverless pipelines always use Unity Catalog, so governance and lineage tracking are built in by default.

For more information, see Configure a serverless pipeline.

Organize pipelines with the medallion architecture

The medallion architecture organizes data into three logical layers — bronze, silver, and gold — each with a distinct purpose. Mapping Lakeflow Spark Declarative Pipelines dataset types to the right layer keeps each layer's responsibilities clear and makes pipelines easier to maintain.

  • Bronze: Use streaming tables to ingest raw data from cloud storage, message buses, or CDC sources. Bronze tables preserve the raw source data with minimal transformation, making it possible for silver or gold layers to reprocess from the source in the bronze layer if requirements change.
  • Silver: Use streaming tables for incremental row-level transformations (filtering, cleaning, and parsing). Use materialized views when silver-layer logic involves enrichment joins against dimension tables or complex aggregations that benefit from incremental refresh.
  • Gold: Use materialized views to pre-compute aggregations, metrics, and summaries served to dashboards, reporting tools, and downstream consumers.

Separate ingestion (bronze) and transformation (silver and gold) into distinct pipeline DAGs whenever possible. Decoupling the layers lets you schedule, monitor, and troubleshoot each layer independently, and a failure in a transformation pipeline doesn't block new data from landing in bronze.

For more information, see Streaming tables and Materialized views.