Delta Live Tables cookbook
This article contains a collection of recommendations and solutions to implement common tasks in your Delta Live Tables pipelines.
Make expectations portable and reusable
Scenario
You want to apply a common set of data quality rules to multiple tables, or the team members that develop and maintain data quality rules are separate from the pipeline developers.
Solution
Maintain data quality rules separately from your pipeline implementations. Store the rules in a format that is reliable and easy to access and update, for example, a text file stored in DBFS or cloud storage or a Delta table. The following example uses a CSV file named rules.csv
stored in DBFS to maintain rules. Each rule in rules.csv
is categorized by a tag. You use this tag in dataset definitions to determine which rules to apply:
name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained
The following Python example defines data quality expectations based on the rules stored in the rules.csv
file. The get_rules()
function reads the rules from rules.csv
and returns a Python dictionary containing rules matching the tag
argument passed to the function. The dictionary is applied in the @dlt.expect_all_*()
decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity
will be dropped from the raw_farmers_market
table:
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from csv file
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
Use Python UDFs in SQL
Scenario
You want the simplicity of SQL to define Delta Live Tables datasets but need transformations not directly supported in SQL.
Solution
Use a Python user-defined function (UDF) in your SQL queries. The following example defines and registers the square()
UDF to return the square of the input argument and calls the square()
UDF in a SQL expression.
Define and register the UDF.
Create a notebook with Default Language set to Python and add the following in a cell:
def square(i: int) -> int: """ Simple udf for squaring the parameter passed. :param i: column from Pyspark or SQL :return: squared value of the passed param. """ return i * i spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
Call the UDF.
Create a SQL notebook and add the following query in a cell:
CREATE OR REFRESH LIVE TABLE raw_squared AS SELECT makeItSquared(2) AS numSquared;
Create a pipeline
Create a new Delta Live Tables pipeline, adding the notebooks you created to Notebook Libraries. Use the Add notebook library button to add additional notebooks in the Create Pipeline dialog or the
libraries
field in the Delta Live Tables settings to configure the notebooks.
Use MLFlow models in a Delta Live Tables pipeline
Scenario
You want to use an MLFlow trained model in a pipeline.
Solution
To use an MLFlow model in a Delta Live Tables pipeline:
Obtain the run ID and model name of the MLFlow model. The run ID and model name are used to construct the URI of the MLFlow model.
Use the URI to define a Spark UDF to load the MLFlow model.
Call the UDF in your table definitions to use the MLFlow model.
The following example defines a Spark UDF named loaded_model
that loads an MLFlow model trained on loan risk data. The loaded_model
UDF is then used to define the gtb_scoring_train_data
and gtb_scoring_valid_data
tables:
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML scored training dataset based on Loan Risk",
table_properties={
"quality": "gold"
}
)
def gtb_scoring_train_data():
return dlt.read("train_data")
.withColumn('predictions', loaded_model(struct(features)))
@dlt.table(
comment="GBT ML scored valid dataset based on Loan Risk",
table_properties={
"quality": "gold"
}
)
def gtb_scoring_valid_data():
return dlt.read("valid_data")
.withColumn('predictions', loaded_model(struct(features)))
Create sample datasets for development and testing
Scenario
You want to create a sample dataset for development or testing, for example, a dataset containing a subset of data or specific record types.
Solution
Implement your transformation logic in a single or shared set of notebooks. Then create separate notebooks to define multiple datasets based on environment. For example, in production, create a notebook that defines the complete set of data for your pipeline:
CREATE OR REFRESH STREAMING LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")
Then create notebooks that define a sample of data based on requirements. For example, to generate a small dataset with specific records for testing:
CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
You can also filter data to create a subset of the production data for development or testing:
CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
To use these different datasets, create multiple pipelines with the notebooks implementing the transformation logic. Each pipeline can read data from the LIVE.input_data
dataset but is configured to include the notebook that creates the dataset specific to the environment.
Programmatically manage and create multiple live tables
Scenario
You have pipelines containing multiple flows or dataset definitions that differ only by a small number of parameters. This redundancy results in pipelines that are error-prone and difficult to maintain. For example, the following diagram shows the graph of a pipeline that uses a fire department dataset to find neighborhoods with the fastest response times for different categories of emergency calls. In this example, the parallel flows differ by only a few parameters.

You can use a metaprogramming pattern to reduce the overhead of generating and maintaining redundant flow definitions. Metaprogramming in Delta Live Tables is done using Python inner functions. Because these functions are lazily evaluated, you can use them to create flows that are identical except for input parameters. Each invocation can include a different set of parameters that controls how each table should be generated, as shown in the following example:
import dlt
from pyspark.sql.functions import *
@dlt.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
all_tables = []
def generate_tables(call_table, response_table, filter):
@dlt.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return (
spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM LIVE.raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
)
@dlt.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time "
)
def create_response_table():
return (
spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM LIVE.{call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
)
all_tables.append(response_table)
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dlt.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dlt.read(t) for t in all_tables]
unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
Quarantine invalid data
Scenario
You’ve defined expectations to filter out records that violate data quality constraints, but you also want to save the invalid records for analysis.
Solution
Create rules that are the inverse of the expectations you’ve defined and use those rules to save the invalid records to a separate table. You can programmatically create these inverse rules. The following example creates the valid_farmers_market
table containing input records that pass the valid_website
and valid_location
data quality constraints and also creates the invalid_farmers_market
table containing the records that fail those data quality constraints:
import dlt
rules = {}
quarantine_rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
# concatenate inverse rules
quarantine_rules["invalid_record"] = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="valid_farmers_market"
)
@dlt.expect_all_or_drop(rules)
def get_valid_farmers_market():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
)
@dlt.table(
name="invalid_farmers_market"
)
@dlt.expect_all_or_drop(quarantine_rules)
def get_invalid_farmers_market():
return (
dlt.read("raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
)
A disadvantage of the above approach is that it generates the quarantine table by processing the data twice. If you don’t want this performance overhead, you can use the constraints directly within a query to generate a column indicating the validation status of a record. You can then partition the table by this column for further optimization.
This approach does not use expectations, so data quality metrics do not appear in the event logs or the pipelines UI.
import dlt
from pyspark.sql.functions import expr
rules = {}
quarantine_rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="partitioned_farmers_market",
partition_cols = [ 'Quarantine' ]
)
def get_partitioned_farmers_market():
return (
dlt.read("raw_farmers_market")
.withColumn("Quarantine", expr(quarantine_rules))
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime",
"Quarantine")
)
Validate row counts across tables
Scenario
You need to compare row counts between two live tables, perhaps to verify that data was processed successfully without dropping rows.
Solution
Add an additional table to your pipeline that defines an expectation to perform the comparison. The results of this expectation appear in the event log and the Delta Live Tables UI. This example validates equal row counts between the tbla
and tblb
tables:
CREATE OR REFRESH LIVE TABLE count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Retain manual deletes or updates
Scenario
Your application requires arbitrary deletion or updates of records in a table and recomputation of all downstream tables. The following diagram illustrates two streaming live tables:
raw_user_table
ingests a set of raw user data from a source.bmi_table
incrementally computes BMI scores using weight and height fromraw_user_table
.
To comply with privacy requirements, you need to delete or update a user record from the raw_user_table
and recompute the bmi_table
.

You can manually delete or update the record from raw_user_table
and do a refresh operation to recompute the downstream tables. However, you need to make sure the deleted record isn’t reloaded from the source data.
Solution
Use the pipelines.reset.allowed
table property to disable full refresh for raw_user_table
so that intended changes are retained over time:
CREATE OR REFRESH STREAMING LIVE TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING LIVE TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);
Setting pipelines.reset.allowed
to false
prevents refreshes to raw_user_table
, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.
Exclude tables from publishing
Use secrets in a pipeline
Scenario
You need to authenticate to a data source from your pipeline, for example, cloud data storage or a database, and don’t want to include credentials in your notebook or configuration.
Solution
Use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration.
The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.
To learn more about working with Azure Data Lake Storage Gen2, see Accessing Azure Data Lake Storage Gen2 and Blob Storage with Databricks.
Note
You must add the spark.hadoop.
prefix to the spark_conf
configuration key that sets the secret value.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"label": "default",
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
},
{
"label": "maintenance",
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
<storage-account-name>
with the ADLS Gen2 storage account name.<scope-name>
with the Databricks secret scope name.<secret-name>
with the name of the key containing the Azure storage account access key.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
<container-name>
with the name of the Azure storage account container that stores the input data.<storage-account-name>
with the ADLS Gen2 storage account name.<path-to-input-dataset>
with the path to the input dataset.
Define limits on pipeline clusters
Scenario
You want to restrict configuration options for clusters that run Delta Live Tables pipelines; for example, you want to control costs by limiting cluster size or simplify cluster configuration by providing pre-defined cluster templates.
Solution
Cluster policies allow you to define templates that limit user access to cluster configuration. You can define one or more cluster policies to use when configuring pipelines.
To create a cluster policy for Delta Live Tables pipelines, define a cluster policy with the cluster_type
field set to dlt
. The following example creates a minimal policy for a Delta Live Tables cluster:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 3,
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"isOptional": true
},
"spark_version": {
"type": "unlimited",
"hidden": true
}
}
For more information on creating cluster policies, including example policies, see Create a cluster policy.
To use a cluster policy in a pipeline configuration, you need the policy ID. To find the policy ID:
Click
Compute in the sidebar.
Click the Cluster Policies tab.
Click the policy name.
Copy the policy ID in the ID field.
The policy ID is also available at the end of the URL when you click on the policy name.
To use the cluster policy with a pipeline:
Note
When using cluster policies to configure Delta Live Tables clusters, Databricks recommends applying a single policy to both the default
and maintenance
clusters.
Click
Workflows in the sidebar and click the Delta Live Tables tab. The Pipelines list displays.
Click the pipeline name. The Pipeline details page appears.
Click the Settings button. The Edit Pipeline Settings dialog appears.
Click the JSON button.
In the
clusters
setting, set thepolicy_id
field to the value of the policy ID. The following example configures thedefault
andmaintenance
clusters using the cluster policy with the IDC65B864F02000008
:{ "clusters": [ { "label": "default", "policy_id": "C65B864F02000008", "autoscale": { "min_workers": 1, "max_workers": 5 } }, { "label": "maintenance", "policy_id": "C65B864F02000008" } ] }
Click Save.