Clean and validate data with batch or stream processing

Cleaning and validating data is essential for ensuring the quality of data assets in a lakehouse. This article outlines Databricks product offerings designed to facilitate data quality, as well as providing recommendations for defining business logic to implement custom rules.

Schema enforcement on Databricks

Delta Lake provides semantics to enforce schema and constraint checks on write, which provides guarantees around data quality for tables in a lakehouse.

Schema enforcement ensures that data written to a table adheres to a predefined schema. Schema validation rules vary by operation. See Schema enforcement.

To handle schema evolution, Delta provides mechanisms for making schema changes and evolving tables. It is important to carefully consider when to use schema evolution to avoid dropped fields or failed pipelines. For details on manually or automatically updating schemas, see Update Delta Lake table schema.

Table constraints

Constraints can take the form of informational primary key and foreign key constraints, or enforced constraints. See ADD CONSTRAINT clause.

Table constraints on Databricks are either enforced or informational.

Enforced constraints include NOT NULL and CHECK constraints.

Informational constraints include primary key and foreign key constraints.

See Constraints on Databricks.

Deal with null or missing values

NOT NULL can be enforced on Delta tables. It can only be enabled on an existing table if no existing records in the column are null, and prevents new records with null values from being inserted into a table.

Pattern enforcement

Regular expressions (regex) can be used to enforce expected patterns in a data field. This is particularly useful when dealing with textual data that needs to adhere to specific formats or patterns.

To enforce a pattern using regex, you can use the REGEXP or RLIKE functions in SQL. These functions allow you to match a data field against a specified regex pattern.

Here’s an example of how to use the CHECK constraint with regex for pattern enforcement in SQL:

CREATE TABLE table_name (
  column_name STRING CHECK (column_name REGEXP '^[A-Za-z0-9]+$')
);

Value enforcement

Constraints can be used to enforce value ranges on columns in a table. This ensures that only valid values within the specified range are allowed to be inserted or updated.

To enforce a value range constraint, you can use the CHECK constraint in SQL. The CHECK constraint allows you to define a condition that must be true for every row in the table.

Here’s an example of how to use the CHECK constraint to enforce a value range on a column:

CREATE TABLE table_name (
  column_name INT CHECK (column_name >= 0 AND column_name <= 100)
);

Define and configure expectations using Delta Live Tables.

Delta Live Tables allows you to define expectations when declaring materialized views or streaming tables. You can choose to configure expectations to warn you about violations, drop violating records, or fail workloads based on violations. See Manage data quality with pipeline expectations.

Data monitoring

Databricks provides data quality monitoring services, which let you monitor the statistical properties and quality of the data in all of the tables in your account. See Introduction to Databricks Lakehouse Monitoring.

Cast data types

When inserting or updating data in a table, Databricks casts data types when it can do so safely without losing information.

See the following articles for details about casting behaviors:

Custom business logic

You can use filters and WHERE clauses to define custom logic that quarantines bad records and prevents them from propagating to downstream tables. CASE WHEN ... OTHERWISE clauses allow you to define conditional logic to gracefully apply business logic to records that violate expectations in predictable ways.

DECLARE current_time = now()

INSERT INTO silver_table
  SELECT * FROM bronze_table
  WHERE event_timestamp <= current_time AND quantity >= 0;

INSERT INTO quarantine_table
  SELECT * FROM bronze_table
  WHERE event_timestamp > current_time OR quantity < 0;

Note

Databricks recommends always processing filtered data as a separate write operation, especially when using Structured Streaming. Using .foreachBatch to write to multiple tables can lead to inconsistent results.

For example, you might have an upstream system that isn’t capable of encoding NULL values, and so the placeholder value -1 is used to represent missing data. Rather than writing custom logic for all downstream queries in Databricks to ignore records containing -1, you could use a case when statement to dynamically replace these records as a transformation.

INSERT INTO silver_table
  SELECT
    * EXCEPT weight,
    CASE
      WHEN weight = -1 THEN NULL
      ELSE weight
    END AS weight
  FROM bronze_table;