Skip to main content

Unit testing for pipelines

Beta

This feature is in Beta.

For general information about Python unit testing in Databricks, see Python unit testing.

Lakeflow Spark Declarative Pipelines supports writing Python unit tests in the web-based Lakeflow Pipelines Editor. This enables you to validate Python or SQL transformation logic using mock data. With the pipeline testing framework, you can test edge cases, validate proprietary pipeline APIs (Auto CDC, streaming tables, expectations, append flows), and iterate quickly without affecting live datasets.

  • Isolated test execution: The framework provides primitives that enable creation of a SparkSession that talks to a fully isolated catalog, allowing mocking of input data within tests.
  • Flexible test scope: Execute a subset of a pipeline (individual tables, chains of dependent tables, or entire pipelines) on the pipeline's compute using the test SparkSession.
  • Result validation: Verify the results of isolated output tables created in a test using standard pytest assertions.

When to use unit testing

Typical use cases include:

  • Validating new transformation logic: Test that your transformation produces the expected schema, row counts, aggregations, and business logic before running against production data.
  • Testing Auto CDC specifications: Validate that your Auto CDC flow definitions correctly process change events — handling inserts, updates, deletes, and SCD (Slowly Changing Dimension) types — using mock data.
  • Testing expectations and data quality rules: Verify that expectations fail when they should and pass when data is valid.
  • Testing across dependent tables: Test chains of transformations (for example, bronze, silver, and gold) to validate that data flows correctly through your pipeline DAG.

Requirements

  • can_run permission on the pipeline, plus USE_CATALOG and CREATE_SCHEMA permissions in the pipeline's default catalog.
  • Pipeline must be configured in triggered (non-continuous) mode.
  • Pipeline must be on the PREVIEW channel. Unit testing is in Beta and is only available on PREVIEW.

Limitations

  • Editor-only execution: Tests must be run from the web-based Lakeflow Pipelines Editor.
  • Python tests only: Tests must be written in Python. You can test SQL pipelines, but the tests themselves must be Python.
  • Governance limitations: Mocked data will not follow row filtering or column masking set at catalog/schema level.
  • No concurrent execution: Do not run tests while the pipeline is running an update. It may severely impact the performance of your production workload.

Step 1: Update pipeline settings

Configure the pipeline to run on the PREVIEW channel in triggered mode.

  1. In the UI, open your pipeline and click Settings > Advanced settings > Channel > Preview
  2. Set Pipeline mode to Triggered (do not use Continuous).

Alternatively, edit the pipeline settings JSON directly:

JSON
"continuous": false,
"channel": "PREVIEW"

Step 2: Create a test file

In the Lakeflow Pipelines Editor, click Edit pipeline > Add pipeline assets > Test. This creates a tests folder and a test file that is not included in your pipeline source code.

Add pipeline assets menu showing the Test option to create a pytest file.

Alternatively, create the folder and file manually:

  1. Click Edit pipeline > Add pipeline assets > New pipeline folder.
  2. Give it the **Name ** tests.
  3. Click the tests folder, then Create file > Python.
  4. Name your file using the pattern test_*.py or *_test.py (for example, test_transformations.py).

Step 3: Generate tests

Genie Code can generate test scaffolding:

  • Inside the test file, click the Generate tests button.

    Empty test file with the Generate tests button.

  • Alternatively, use /tests inside Genie Code agent mode.

    Test file populated by Genie Code with TestPipeline-based unit tests.

Use Genie Code to generate boilerplate, then customize for your edge cases.

Alternatively, you can write the test code yourself. Add the following imports to the top of each test file:

Python
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark

test_pipeline = TestPipeline.active()

Step 4: Run tests

Run tests from the Lakeflow Pipelines Editor:

  • Click the Play icon. (play) button in the gutter next to a test function to run an individual test.
  • Click Run tests in file at the top of the test file to run all tests in that file.

Test results (success or failure) appear in the Editor lower panel. Review assertion errors to debug failures.

Testing APIs

API

Description

TestPipeline.active()

Returns a TestPipeline object for the pipeline currently being edited in the Lakeflow Pipelines Editor. This object is a reference to the pipeline including its source code, configurations, default catalog/schema, etc.

test_pipeline.run(test_spark, set([table_names]))

Synchronously executes an update of the pipeline, performing a selective refresh if table names are specified. Returns after pipeline execution succeeds or terminates with an exception.

test_spark fixture

Creates an isolated SparkSession, which automatically redirects all table reads and writes to a temporary test schema, ensuring production data is never affected.

Create mock data

You can mock input data using either SQL or createDataFrame:

Python
# Option 1: Using SQL
test_spark.sql("""
CREATE TABLE catalog.schema.table_name AS
SELECT * FROM VALUES
(1, 'value1'),
(2, 'value2')
AS t(id, name)
""")

# Option 2: Using createDataFrame
df = test_spark.createDataFrame(
[(1, 'value1'), (2, 'value2')],
schema=["id", "name"]
)
df.write.saveAsTable("catalog.schema.table_name")

To generate larger volumes of realistic synthetic data, you can use the Faker library. Run %pip install faker in your pipeline first, then build a DataFrame from Faker-backed UDFs:

Python
# Option 3: Using Faker for synthetic data
from pyspark.sql import functions as F
from faker import Faker

fake = Faker()
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)

df = (
test_spark.range(0, 100)
.withColumn("firstname", fake_firstname())
.withColumn("lastname", fake_lastname())
.withColumn("email", fake_email())
)
df.write.saveAsTable("catalog.schema.table_name")

Run the pipeline or specific tables

Python
# Run specific tables
test_pipeline.run(test_spark, set(["catalog.schema.table1", "catalog.schema.table2"]))

# Run all tables in the pipeline
test_pipeline.run(test_spark)

Examples

Example 1: Testing aggregations with row count, schema, and null handling

Goal: Validate user aggregation correctly counts users by type, handles null emails, and produces the expected schema.

Pipeline transformations:

These transformations create a simple two-table pipeline: users selects user data, and counts groups users by type and counts total users and valid emails.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, count_if

@dp.table
def users():
return (
spark.read.table("catalog.schema.wanderbricks_users")
.select("user_id", "email", "name", "user_type")
)

@dp.table
def counts():
return (
spark.read.table("catalog.schema.users")
.withColumn("valid_email", col("email").isNotNull())
.groupBy("user_type")
.agg(
count("user_id").alias("total_count"),
count_if("valid_email").alias("count_valid_emails")
)
)

Tests:

These tests validate row counts, schema structure, null handling, and aggregation logic by creating mock user data with intentional nulls and running the pipeline in isolation.

Python
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark
from pyspark.testing import assertDataFrameEqual

test_pipeline = TestPipeline.active()

# Mock data fixture
def mock_users(session):
session.sql("""
CREATE TABLE catalog.schema.wanderbricks_users AS
SELECT * FROM VALUES
(1, 'alice@example.com', 'Alice', 'admin'),
(2, NULL, 'Bob', 'user'),
(3, 'charlie@example.com', 'Charlie', 'user'),
(4, NULL, 'Dana', 'admin')
AS t(user_id, email, name, user_type)
""")

# Test 1: Row count
def test_users_row_count(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
assert result.count() == 4

# Test 2: Schema validation
def test_users_schema(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
expected_fields = {"user_id", "email", "name", "user_type"}
actual_fields = set(f.name for f in result.schema.fields)
assert expected_fields == actual_fields

# Test 3: Null handling
def test_users_null_handling(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users"]))
result = test_spark.table("catalog.schema.users")
null_emails = result.filter("email IS NULL").count()
assert null_emails == 2

# Test 4: Aggregation
def test_counts(test_spark):
mock_users(test_spark)
# Run both tables since counts depends on users
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
# Check counts for each user_type
admin_row = result.filter("user_type = 'admin'").collect()[0]
user_row = result.filter("user_type = 'user'").collect()[0]
assert admin_row["total_count"] == 2
assert admin_row["count_valid_emails"] == 1
assert user_row["total_count"] == 2
assert user_row["count_valid_emails"] == 1

# Test 5: Full DataFrame comparison with assertDataFrameEqual
def test_counts_full_dataframe(test_spark):
mock_users(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.users", "catalog.schema.counts"]))
result = test_spark.table("catalog.schema.counts")
expected = test_spark.createDataFrame(
[("admin", 2, 1), ("user", 2, 1)],
schema=["user_type", "total_count", "count_valid_emails"]
)
assertDataFrameEqual(result, expected)

Example 2: Testing Auto CDC

Goal: Validate that Auto CDC correctly processes change feed with inserts and updates.

Pipeline transformation:

This transformation sets up Auto CDC from a change feed, which reads streaming changes and applies them to the target table as SCD Type 1 (keeps only the latest version).

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

@dp.view
def users():
return spark.readStream.table("catalog.schema.change_feed")

dp.create_streaming_table("target_autocdc")
dp.create_auto_cdc_flow(
target="target_autocdc",
source="users",
keys=["userId"],
sequence_by=col("ts"),
stored_as_scd_type=1
)

Tests:

The first test creates a mock change feed with multiple records for the same userId (simulating an update) and verifies that only the latest record is retained in the target. The second test simulates late-arriving and out-of-order events by running the pipeline, appending more events to the change feed, and running the pipeline again.

Python
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark

test_pipeline = TestPipeline.active()

# Test 1: Standard inserts and updates
def test_auto_cdc_flow(test_spark):
# Create a mock change feed table
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001),
(1, 'Alice Updated', 1002)
AS t(userId, name, ts)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))
# Read the output
result = test_spark.table("catalog.schema.target_autocdc")
# Verify two users exist
user_ids = set(row["userId"] for row in result.collect())
assert user_ids == {1, 2}
# Verify latest record for userId=1 has ts=1002
latest_user1 = result.filter("userId = 1").collect()[0]
assert latest_user1["ts"] == 1002
assert latest_user1["name"] == "Alice Updated"
# Verify userId=2 has ts=1001
user2 = result.filter("userId = 2").collect()[0]
assert user2["ts"] == 1001

# Test 2: Late-arriving and out-of-order events
def test_auto_cdc_late_arriving(test_spark):
# First batch of change events
test_spark.sql("""
CREATE TABLE catalog.schema.change_feed AS
SELECT * FROM VALUES
(1, 'Alice', 1000),
(2, 'Bob', 1001)
AS t(userId, name, ts)
""")
# Run the pipeline with the initial batch
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))

# Append late-arriving events to the change feed:
# - A newer event for userId=1 (ts=1003) that arrived after the first run
# - A stale event for userId=2 (ts=999) with a timestamp older than what is already applied
test_spark.sql("""
INSERT INTO catalog.schema.change_feed VALUES
(1, 'Alice Updated', 1003),
(2, 'Bob (stale)', 999)
""")
# Re-run the pipeline. sequence_by=ts ensures stale events do not overwrite newer state.
test_pipeline.run(test_spark, set(["catalog.schema.target_autocdc"]))

result = test_spark.table("catalog.schema.target_autocdc")
# userId=1 should reflect the newer late-arriving event
alice = result.filter("userId = 1").collect()[0]
assert alice["ts"] == 1003
assert alice["name"] == "Alice Updated"
# userId=2 should be unchanged: the stale event with an older ts is ignored
bob = result.filter("userId = 2").collect()[0]
assert bob["ts"] == 1001
assert bob["name"] == "Bob"

Example 3: Testing Auto CDC from snapshot

Goal: Validate that CDC correctly processes snapshot changes including inserts, updates, and deletes.

Pipeline transformation:

This transformation sets up Auto CDC from snapshot, which reads from a snapshot table and tracks changes over time as SCD Type 2 (maintains full history).

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("catalog.schema.snapshot")

dp.create_streaming_table("catalog.schema.target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["userId"],
stored_as_scd_type=2
)

Test:

This test creates an initial snapshot, runs the pipeline, then simulates a snapshot update by truncating and inserting new data to verify that CDC captures all changes.

Python
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark

test_pipeline = TestPipeline.active()

def test_auto_cdc_from_snapshot_flow(test_spark):
# Create initial snapshot
test_spark.sql("""
CREATE TABLE catalog.schema.snapshot AS
SELECT * FROM VALUES
(1, 'Alice', '2024-01-01'),
(2, 'Bob', '2024-01-02')
AS t(userId, name, created_at)
""")
# Run the pipeline
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Simulate a new snapshot by truncating and inserting updated data
test_spark.sql("TRUNCATE TABLE catalog.schema.snapshot")
test_spark.sql("INSERT INTO catalog.schema.snapshot VALUES (2, 'Bob', '2024-01-03')")
test_pipeline.run(test_spark, set(["catalog.schema.target"]))
# Verify SCD Type 2: should have 3 rows (original Alice, original Bob, updated Bob)
result = test_spark.table("catalog.schema.target")
assert result.count() == 3
user_ids = [row["userId"] for row in result.collect()]
assert set(user_ids) == {1, 2}

Example 4: Testing joins and expectations

Goal: Validate that joins work correctly and expectations filter out invalid data.

Pipeline transformation:

This transformation joins property images with amenities and applies an expectation to filter out images uploaded before January 2024.

Python
from pyspark import pipelines as dp

@dp.table
@dp.expect_or_drop("uploaded after Jan 2024", "uploaded_at > '2024-01-01'")
def property_images_amenities_join():
return (
spark.read.table("catalog.schema.property_images")
.join(
spark.read.table("catalog.schema.property_amenities"),
on="property_id",
how="inner"
)
)

Tests:

These tests verify that the join produces the correct number of rows and that the expectation successfully filters out records with invalid upload dates.

Python
import pytest
from pyspark.pipelines.testing import TestPipeline, test_spark

test_pipeline = TestPipeline.active()

# Mock property datasets
def mock_properties(session):
session.sql("""
CREATE TABLE catalog.schema.property_images AS
SELECT * FROM VALUES
(101, 'img1.jpg', '2024-02-01'),
(102, 'img2.jpg', '2024-01-15'),
(103, 'img3.jpg', '2024-12-20')
AS t(property_id, image_url, uploaded_at)
""")
session.sql("""
CREATE TABLE catalog.schema.property_amenities AS
SELECT * FROM VALUES
(101, 'wifi'),
(102, 'pool'),
(103, 'parking')
AS t(property_id, amenity)
""")

# Test 1: Join
def test_property_join(test_spark):
mock_properties(test_spark)
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Should have 3 rows after join
assert result.count() == 3
# Check all property_ids are present
property_ids = set(row["property_id"] for row in result.collect())
assert property_ids == {101, 102, 103}

# Test 2: Expectation
def test_property_expectation(test_spark):
mock_properties(test_spark)
# Add a row with uploaded_at before Jan 2024
test_spark.sql("""
INSERT INTO catalog.schema.property_images VALUES (104, 'img4.jpg', '2023-12-31')
""")
# Add a matching row in the amenities table for the join
test_spark.sql("""
INSERT INTO catalog.schema.property_amenities VALUES (104, 'gym')
""")
test_pipeline.run(test_spark, set(["catalog.schema.property_images_amenities_join"]))
result = test_spark.table("catalog.schema.property_images_amenities_join")
# Only property_ids with uploaded_at > '2024-01-01' should be present
valid_ids = set(row["property_id"] for row in result.collect())
assert 104 not in valid_ids
assert valid_ids == {101, 102, 103}