Manage data quality with Delta Live Tables
You use expectations to define data quality constraints on the contents of a dataset. Expectations allow you to guarantee data arriving in tables meets data quality requirements and provide insights into data quality for each pipeline update. You apply expectations to queries using Python decorators or SQL constraint clauses.
What are Delta Live Tables expectations?
Expectations are optional clauses you add to Delta Live Tables dataset declarations that apply data quality checks on each record passing through a query.
An expectation consists of three things:
A description, which acts as a unique identifier and allows you to track metrics for the constraint.
A boolean statement that always returns true or false based on some stated condition.
An action to take when a record fails the expectation, meaning the boolean returns false.
The following matrix shows the three actions you can apply to invalid records:
Action |
Result |
---|---|
warn (default) |
Invalid records are written to the target; failure is reported as a metric for the dataset. |
Invalid records are dropped before data is written to the target; failure is reported as a metrics for the dataset. |
|
Invalid records prevent the update from succeeding. Manual intervention is required before re-processing. |
You can view data quality metrics such as the number of records that violate an expectation by querying the Delta Live Tables event log. See Monitor Delta Live Tables pipelines.
For a complete reference of Delta Live Tables dataset declaration syntax, see Delta Live Tables Python language reference or Delta Live Tables SQL language reference.
Note
While you can include multiple clauses in any expectation, only Python supports defining actions based on multiple expectations. See Multiple expectations.
Expectations must be defined using SQL expressions. You cannot use non-SQL syntax (for example, Python functions) when defining an expectation.
Retain invalid records
Use the expect
operator when you want to keep records that violate the expectation. Records that violate the expectation are added to the target dataset along with valid records:
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Drop invalid records
Use the expect or drop
operator to prevent further processing of invalid records. Records that violate the expectation are dropped from the target dataset:
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Fail on invalid records
When invalid records are unacceptable, use the expect or fail
operator to stop execution immediately when a record fails validation. If the operation is a table update, the system atomically rolls back the transaction:
@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Important
If you have multiple parallel flows defined in a pipeline, failure of a single flow does not cause other flows to fail.
When a pipeline fails because of an expectation violation, you must fix the pipeline code to handle the invalid data correctly before re-running the pipeline.
Fail expectations modify the Spark query plan of your transformations to track information required to detect and report on violations. For many queries, you can use this information to identify which input record resulted in the violation. The following is an example exception:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Multiple expectations
You can define expectations with one or more data quality constraints in Python pipelines. These decorators accept a Python dictionary as an argument, where the key is the expectation name and the value is the expectation constraint.
Use expect_all
to specify multiple data quality constraints when records that fail validation should be included in the target dataset:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Use expect_all_or_drop
to specify multiple data quality constraints when records that fail validation should be dropped from the target dataset:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Use expect_all_or_fail
to specify multiple data quality constraints when records that fail validation should halt pipeline execution:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
You can also define a collection of expectations as a variable and pass it to one or more queries in your pipeline:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Quarantine invalid data
The following example uses expectations in combination with temporary tables and views. This pattern provides you with metrics for records that pass expectation checks during pipeline updates, and provides a way to process valid and invalid records through different downstream paths.
Note
This example reads sample data included in the Databricks datasets. Because the Databricks datasets are not supported with a pipeline that publishes to Unity Catalog, this example works only with a pipeline configured to publish to the Hive metastore. However, this pattern also works with Unity Catalog enabled pipelines, but you must read data from external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Validate row counts across tables
You can add an additional table to your pipeline that defines an expectation to compare row counts between two materialized views or streaming tables. The results of this expectation appear in the event log and the Delta Live Tables UI. The following example validates equal row counts between the tbla
and tblb
tables:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Perform advanced validation with Delta Live Tables expectations
You can define materialized views using aggregate and join queries and use the results of those queries as part of your expectation checking. This is useful if you wish to perform complex data quality checks, for example, ensuring a derived table contains all records from the source table or guaranteeing the equality of a numeric column across tables.
The following example validates that all expected records are present in the report
table:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
The following example uses an aggregate to ensure the uniqueness of a primary key:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Make expectations portable and reusable
You can maintain data quality rules separately from your pipeline implementations.
Databricks recommends storing the rules in a Delta table with each rule categorized by a tag. You use this tag in dataset definitions to determine which rules to apply.
The following example creates a table named rules
to maintain rules:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
The following Python example defines data quality expectations based on the rules stored in the rules
table. The get_rules()
function reads the rules from the rules
table and returns a Python dictionary containing rules matching the tag
argument passed to the function. The dictionary is applied in the @dlt.expect_all_*()
decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity
will be dropped from the raw_farmers_market
table:
Note
This example reads sample data included in the Databricks datasets. Because the Databricks datasets are not supported with a pipeline that publishes to Unity Catalog, this example works only with a pipeline configured to publish to the Hive metastore. However, this pattern also works with Unity Catalog enabled pipelines, but you must read data from external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Instead of creating a table named rules
to maintain rules, you could create a Python module to main rules, for example, in a file named rules_module.py
in the same folder as the notebook:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Then modify the preceding notebook by importing the module and changing the get_rules()
function to read from the module instead of from the rules
table:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)