Delta Live Tables cookbook

This article contains a collection of recommendations and solutions to implement common tasks in your Delta Live Tables pipelines.

Make expectations portable and reusable

Scenario

You want to apply a common set of data quality rules to multiple tables, or the team members that develop and maintain data quality rules are separate from the pipeline developers.

Solution

Maintain data quality rules separately from your pipeline implementations. Store the rules in a format that is reliable and easy to access and update, for example, a text file stored in DBFS or cloud storage or a Delta table. The following example uses a CSV file named rules.csv stored in DBFS to maintain rules. Each rule in rules.csv is categorized by a tag. You use this tag in dataset definitions to determine which rules to apply:

name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained

The following Python example defines data quality expectations based on the rules stored in the rules.csv file. The get_rules() function reads the rules from rules.csv and returns a Python dictionary containing rules matching the tag argument passed to the function. The dictionary is applied in the @dlt.expect_all_*() decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Use Python UDFs in SQL

Scenario

You want the simplicity of SQL to define Delta Live Tables datasets but need transformations not directly supported in SQL.

Solution

Use a Python user-defined function (UDF) in your SQL queries. The following example defines and registers the square() UDF to return the square of the input argument and calls the square() UDF in a SQL expression.

  1. Define and register the UDF.

    Create a notebook with Default Language set to Python and add the following in a cell:

    def square(i: int) -> int:
      """
        Simple udf for squaring the parameter passed.
        :param i: column from Pyspark or SQL
        :return: squared value of the passed param.
      """
      return i * i
    
    spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
    
  2. Call the UDF.

    Create a SQL notebook and add the following query in a cell:

    CREATE OR REFRESH LIVE TABLE raw_squared
    AS SELECT makeItSquared(2) AS numSquared;
    
  3. Create a pipeline

    Create a new Delta Live Tables pipeline, adding the notebooks you created to Notebook Libraries. Use the Add notebook library button to add additional notebooks in the Create Pipeline dialog or the libraries field in the Delta Live Tables settings to configure the notebooks.

Use MLFlow models in a Delta Live Tables pipeline

Scenario

You want to use an MLFlow trained model in a pipeline.

Solution

To use an MLFlow model in a Delta Live Tables pipeline:

  1. Obtain the run ID and model name of the MLFlow model. The run ID and model name are used to construct the URI of the MLFlow model.

  2. Use the URI to define a Spark UDF to load the MLFlow model.

  3. Call the UDF in your table definitions to use the MLFlow model.

The following example defines a Spark UDF named loaded_model that loads an MLFlow model trained on loan risk data. The loaded_model UDF is then used to define the gtb_scoring_train_data and gtb_scoring_valid_data tables:

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML scored training dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_train_data():
  return dlt.read("train_data")
    .withColumn('predictions', loaded_model(struct(features)))

@dlt.table(
  comment="GBT ML scored valid dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_valid_data():
  return dlt.read("valid_data")
    .withColumn('predictions', loaded_model(struct(features)))

Create sample datasets for development and testing

Scenario

You want to create a sample dataset for development or testing, for example, a dataset containing a subset of data or specific record types.

Solution

Implement your transformation logic in a single or shared set of notebooks. Then create separate notebooks to define multiple datasets based on environment. For example, in production, create a notebook that defines the complete set of data for your pipeline:

CREATE OR REFRESH STREAMING LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

Then create notebooks that define a sample of data based on requirements. For example, to generate a small dataset with specific records for testing:

CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

You can also filter data to create a subset of the production data for development or testing:

CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

To use these different datasets, create multiple pipelines with the notebooks implementing the transformation logic. Each pipeline can read data from the LIVE.input_data dataset but is configured to include the notebook that creates the dataset specific to the environment.

Programmatically manage and create multiple live tables

Scenario

You have pipelines containing multiple flows or dataset definitions that differ only by a small number of parameters. This redundancy results in pipelines that are error-prone and difficult to maintain. For example, the following diagram shows the graph of a pipeline that uses a fire department dataset to find neighborhoods with the fastest response times for different categories of emergency calls. In this example, the parallel flows differ by only a few parameters.

Fire dataset flow diagram

You can use a metaprogramming pattern to reduce the overhead of generating and maintaining redundant flow definitions. Metaprogramming in Delta Live Tables is done using Python inner functions. Because these functions are lazily evaluated, you can use them to create flows that are identical except for input parameters. Each invocation can include a different set of parameters that controls how each table should be generated, as shown in the following example:

import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Quarantine invalid data

Scenario

You’ve defined expectations to filter out records that violate data quality constraints, but you also want to save the invalid records for analysis.

Solution

Create rules that are the inverse of the expectations you’ve defined and use those rules to save the invalid records to a separate table. You can programmatically create these inverse rules. The following example creates the valid_farmers_market table containing input records that pass the valid_website and valid_location data quality constraints and also creates the invalid_farmers_market table containing the records that fail those data quality constraints:

import dlt

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"

# concatenate inverse rules
quarantine_rules["invalid_record"] = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="valid_farmers_market"
)
@dlt.expect_all_or_drop(rules)
def get_valid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

@dlt.table(
  name="invalid_farmers_market"
)
@dlt.expect_all_or_drop(quarantine_rules)
def get_invalid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

A disadvantage of the above approach is that it generates the quarantine table by processing the data twice. If you don’t want this performance overhead, you can use the constraints directly within a query to generate a column indicating the validation status of a record. You can then partition the table by this column for further optimization.

This approach does not use expectations, so data quality metrics do not appear in the event logs or the pipelines UI.

import dlt
from pyspark.sql.functions import expr

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="partitioned_farmers_market",
  partition_cols = [ 'Quarantine' ]
)
def get_partitioned_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .withColumn("Quarantine", expr(quarantine_rules))
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime",
              "Quarantine")
  )

Validate row counts across tables

Scenario

You need to compare row counts between two live tables, perhaps to verify that data was processed successfully without dropping rows.

Solution

Add an additional table to your pipeline that defines an expectation to perform the comparison. The results of this expectation appear in the event log and the Delta Live Tables UI. This example validates equal row counts between the tbla and tblb tables:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Retain manual deletes or updates

Scenario

Your application requires arbitrary deletion or updates of records in a table and recomputation of all downstream tables. The following diagram illustrates two streaming live tables:

  • raw_user_table ingests a set of raw user data from a source.

  • bmi_table incrementally computes BMI scores using weight and height from raw_user_table.

To comply with privacy requirements, you need to delete or update a user record from the raw_user_table and recompute the bmi_table.

Retain data diagram

You can manually delete or update the record from raw_user_table and do a refresh operation to recompute the downstream tables. However, you need to make sure the deleted record isn’t reloaded from the source data.

Solution

Use the pipelines.reset.allowed table property to disable full refresh for raw_user_table so that intended changes are retained over time:

CREATE OR REFRESH STREAMING LIVE TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING LIVE TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);

Setting pipelines.reset.allowed to false prevents refreshes to raw_user_table, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.

Exclude tables from publishing

Scenario

You’ve configured the target setting to publish your tables, but there are some tables you don’t want published.

Solution

Define a temporary table to instruct Delta Live Tables not to persist metadata for the table:

CREATE TEMPORARY LIVE TABLE customers_raw
AS SELECT * FROM json.`/data/customers/json/`
@dlt.table(
  comment="Raw customer data",
  temporary=True)
def customers_raw():
  return ("...")

Use secrets in a pipeline

Scenario

You need to authenticate to a data source from your pipeline, for example, cloud data storage or a database, and don’t want to include credentials in your notebook or configuration.

Solution

Use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration.

The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.

To learn more about working with Azure Data Lake Storage Gen2, see Accessing Azure Data Lake Storage Gen2 and Blob Storage with Databricks.

Note

You must add the spark.hadoop. prefix to the spark_conf configuration key that sets the secret value.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "label": "default",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5
        }
      },
      {
        "label": "maintenance",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> with the ADLS Gen2 storage account name.

  • <scope-name> with the Databricks secret scope name.

  • <secret-name> with the name of the key containing the Azure storage account access key.

import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> with the name of the Azure storage account container that stores the input data.

  • <storage-account-name> with the ADLS Gen2 storage account name.

  • <path-to-input-dataset> with the path to the input dataset.

Define limits on pipeline clusters

Scenario

You want to restrict configuration options for clusters that run Delta Live Tables pipelines; for example, you want to control costs by limiting cluster size or simplify cluster configuration by providing pre-defined cluster templates.

Solution

Cluster policies allow you to define templates that limit user access to cluster configuration. You can define one or more cluster policies to use when configuring pipelines.

To create a cluster policy for Delta Live Tables pipelines, define a cluster policy with the cluster_type field set to dlt. The following example creates a minimal policy for a Delta Live Tables cluster:

{
  "cluster_type": {
    "type": "fixed",
    "value": "dlt"
  },
  "num_workers": {
    "type": "unlimited",
    "defaultValue": 3,
    "isOptional": true
  },
  "node_type_id": {
    "type": "unlimited",
    "isOptional": true
  },
  "spark_version": {
    "type": "unlimited",
    "hidden": true
  }
}

For more information on creating cluster policies, including example policies, see Create a cluster policy.

To use a cluster policy in a pipeline configuration, you need the policy ID. To find the policy ID:

  1. Click compute icon Compute in the sidebar.

  2. Click the Cluster Policies tab.

  3. Click the policy name.

  4. Copy the policy ID in the ID field.

The policy ID is also available at the end of the URL when you click on the policy name.

To use the cluster policy with a pipeline:

Note

When using cluster policies to configure Delta Live Tables clusters, Databricks recommends applying a single policy to both the default and maintenance clusters.

  1. Click Jobs Icon Workflows in the sidebar and click the Delta Live Tables tab. The Pipelines list displays.

  2. Click the pipeline name. The Pipeline details page appears.

  3. Click the Settings button. The Edit Pipeline Settings dialog appears.

  4. Click the JSON button.

  5. In the clusters setting, set the policy_id field to the value of the policy ID. The following example configures the default and maintenance clusters using the cluster policy with the ID C65B864F02000008:

    {
      "clusters": [
        {
          "label": "default",
          "policy_id": "C65B864F02000008",
          "autoscale": {
            "min_workers": 1,
            "max_workers": 5
          }
        },
        {
          "label": "maintenance",
          "policy_id": "C65B864F02000008"
        }
      ]
    }
    
  6. Click Save.