Skip to main content

ETL in Databricks SQL

When dealing with large amounts of data, you need a pipeline that can process only the new and changed records instead of reprocessing the entire dataset. This is called incremental ETL. In Databricks SQL, you can build incremental ETL pipelines using streaming tables and materialized views, without writing procedural code or scheduling manual refreshes.

This tutorial walks you through a common pattern: tracking product changes over time. You create a source table, capture change events, build a dimension table that preserves the full history of each product, and add an aggregate reporting layer on top.

The key feature in this tutorial is AUTO CDC. In a traditional warehouse, you would write complex MERGE INTO statements to reconcile insert, update, and delete events into a target table. This approach is error-prone, especially when events arrive out of order. AUTO CDC handles this for you. You declare the business key, the sequencing column, and whether you want SCD Type 1 (latest value only) or SCD Type 2 (full history), and Databricks applies the correct merge logic automatically. For an overview of CDC, see The AUTO CDC APIs: Simplify change data capture with pipelines.

By the end of this tutorial, you will have:

  1. Created a source table that tracks changes with Change Data Feed.
  2. Inspected the raw change data to understand the CDC event stream.
  3. Used AUTO CDC to build an SCD Type 2 dimension table from those events.
  4. Processed delete events incrementally through the pipeline.
  5. Created a materialized view that incrementally maintains an aggregate report.
  6. Configured SCHEDULE REFRESH EVERY 1 DAY so changes propagate automatically through the pipeline.

Requirements

To complete this tutorial, you must meet the following requirements:

Step 1: Set up your catalog and schema

Open the Databricks SQL editor and set your working catalog and schema. You must have permission to USE the catalog and schema you select:

SQL
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

Step 2: Create a source table and load data

Create a products table with Use Delta Lake change data feed on Databricks (CDF) enabled. CDF is a Delta Lake feature that records every insert, update, and delete as a queryable change log. This is similar to a CDC stream from a transactional source system, except the changes are captured directly within the Delta table rather than from an external log. You use CDF here to generate the change events that the downstream pipeline will consume.

  1. Create the table and load initial records:

    SQL
    CREATE OR REPLACE TABLE products (
    product_id INT,
    product_name STRING,
    category STRING,
    warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);

    INSERT INTO products VALUES
    (1, 'Spoon', 'Cutlery', 'Seattle'),
    (2, 'Fork', 'Cutlery', 'Portland'),
    (3, 'Knife', 'Cutlery', 'Denver'),
    (4, 'Chair', 'Furniture', 'Austin'),
    (5, 'Table', 'Furniture', 'Chicago'),
    (6, 'Lamp', 'Lighting', 'Boston'),
    (7, 'Mug', 'Kitchenware', 'Seattle'),
    (8, 'Plate', 'Kitchenware', 'Atlanta'),
    (9, 'Bowl', 'Kitchenware', 'Dallas'),
    (10, 'Glass', 'Kitchenware', 'Phoenix');
  2. Simulate upstream changes, including new products, a warehouse move, and a category reassignment:

    SQL
    INSERT INTO products VALUES
    (11, 'Napkin', 'Dining', 'San Francisco'),
    (12, 'Coaster', 'Dining', 'New York');

    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;

Step 3: Query the change data feed

Before building the downstream pipeline, it helps to look at the raw change events so you can understand what AUTO CDC will process. The table_changes() function reads the CDF log and returns every captured operation along with metadata columns:

SQL
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

For example, the Spoon has three events: an insert (Seattle), an update_preimage (Seattle), and an update_postimage (Los Angeles).

Notice that a single logical change (for example, moving the Spoon to a different warehouse) produces multiple events: a preimage and a postimage. In a traditional warehouse, you would write a MERGE statement to reconcile all of these events into a target table, handling inserts, updates, and deletes with separate logic, and making sure events are applied in the correct order. This is exactly the complexity that AUTO CDC eliminates in the next step.

Step 4: Build an SCD Type 2 dimension with AUTO CDC

Beta

AUTO CDC is in Beta. Requires Databricks Runtime 17.3 or above.

A streaming table processes data incrementally. On each refresh, it reads only the new rows since the last run, so it does not need to reprocess the full dataset. This makes it well-suited for high-volume or frequently changing sources.

AUTO CDC adds change data capture processing on top of a streaming table. Instead of writing a MERGE INTO statement that manually handles inserts, updates, and deletes, you declare the business key and sequencing column and let Databricks apply the correct logic. AUTO CDC also handles out-of-order events automatically, which is a common problem when using MERGE INTO to handle events arriving from distributed systems or batch loads with overlapping timestamps.

The following statement creates an SCD Type 2 table that preserves the full version history of each product. Each version gets __START_AT and __END_AT timestamps. A NULL in __END_AT marks the current version.

SQL
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY: refreshes the table on a daily schedule.
  • FLOW AUTO CDC: declares this as a CDC flow. Databricks applies insert, update, and delete semantics automatically.
  • KEYS (product_id): the business key. Events with the same key are merged into versioned rows.
  • APPLY AS DELETE WHEN _change_type = 'delete': closes out the current version when a delete event arrives. This lets you define the condition that identifies a delete event.
  • SEQUENCE BY _commit_timestamp: establishes event ordering. Handles out-of-order arrivals correctly.
  • STORED AS SCD TYPE 2: retains full history. AUTO CDC supports both SCD Type 1 and SCD Type 2.

Query the dimension table:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • Spoon: two versions. Seattle (closed, __END_AT set) and Los Angeles (current, __END_AT = NULL).
  • Fork: two versions. Cutlery category (closed) and Dining category (current).
  • Napkin and Coaster: one version each (newly inserted, __END_AT = NULL).
  • All other products: one version each (__END_AT = NULL).

Step 5: Process deletes through the pipeline

Now simulate two discontinued products by deleting them from the source table:

SQL
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

These delete events are recorded in the CDF log but the streaming table has not seen them yet. Refresh the streaming table to process the new events:

SQL
REFRESH STREAMING TABLE products_history;

Query the dimension table to verify the deletes were applied:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

Bowl and Glass are now closed out with __END_AT set, marking them as discontinued. All other current products remain unchanged. The streaming table only processed the new delete events without reprocessing the inserts and updates from the previous refresh.

Step 6: Create an aggregate materialized view

Now that you have a dimension table that stays current with source changes, you can add a reporting layer on top.

A materialized view stores pre-computed query results as a physical table. Unlike a regular view, which re-executes the query each time you read from it, a materialized view persists the results and only recomputes the rows affected by upstream changes on each refresh. This makes it well-suited for dashboards and reports where query performance matters.

SQL
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY means this view refreshes on a daily schedule. Combined with the same schedule on the streaming table, you now have a three-stage pipeline where changes to the source table cascade through the dimension and into the aggregate on each refresh cycle. There is no manual refresh to run.

SQL
SELECT * FROM products_by_category ORDER BY active_products DESC;

Step 7: Verify the end-to-end cascade

To verify the full pipeline cascade, make a change to the source table:

SQL
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

The Knife moves from Denver to Seattle. This single DML change triggers the full pipeline cascade, demonstrating how the three stages work together:

  1. products records the change event via CDF.
  2. products_history processes the event and adds a new version for the Knife.
  3. products_by_category recomputes only the affected Cutlery row.

Verify:

SQL
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

Clean up

To clean up the resources created by this tutorial, use the following SQL:

SQL
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

Additional resources