Delta Live Tables cookbook

Preview

This feature is in Public Preview. To sign up for access, see Request Access to Delta Live Tables.

This article contains a collection of recommendations and solutions to implement common tasks in your Delta Live Tables pipelines.

Make expectations portable and reusable

Scenario

You want to apply a common set of data quality rules to multiple tables, or the team members that develop and maintain data quality rules are separate from the pipeline developers.

Solution

Maintain data quality rules separately from your pipeline implementations. Store the rules in a format that is reliable and easy to access and update, for example, a text file stored in DBFS or cloud storage or a Delta table. The following example uses a CSV file named rules.csv stored in DBFS to maintain rules. Each rule in rules.csv is categorized by a tag. You use this tag in dataset definitions to determine which rules to apply:

name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained

The following Python example defines data quality expectations based on the rules stored in the rules.csv file. The get_rules() function reads the rules from rules.csv and returns a Python dictionary containing rules matching the tag argument passed to the function. The dictionary is applied in the @dlt.expect_all_*() decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Use Python UDFs in SQL

Scenario

You want the simplicity of SQL to define Delta Live Tables datasets but need transformations not directly supported in SQL.

Solution

Use a Python user-defined function (UDF) in your SQL queries. The following example defines and registers the square() UDF to return the square of the input argument and calls the square() UDF in a SQL expression.

  1. Define and register the UDF.

    Create a notebook with Default Language set to Python and add the following in a cell:

    def square(i: int) -> int:
      """
        Simple udf for squaring the parameter passed.
        :param i: column from Pyspark or SQL
        :return: squared value of the passed param.
      """
      return i * i
    
    spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
    
  2. Call the UDF.

    Create a SQL notebook and add the following query in a cell:

    CREATE LIVE TABLE raw_squared
    AS SELECT makeItSquared(2) AS numSquared;
    
  3. Create a pipeline

    Create a new Delta Live Tables pipeline, adding the notebooks you created to Notebook Libraries. Use the Add notebook library button to add additional notebooks in the Create Pipeline dialog or the libraries field in the Delta Live Tables settings to configure the notebooks.

Use MLFlow models in a Delta Live Tables pipeline

Scenario

You want to use an MLFlow trained model in a pipeline.

Solution

To use an MLFlow model in a Delta Live Tables pipeline:

  1. Obtain the run ID and model name of the MLFlow model. The run ID and model name are used to construct the URI of the MLFlow model.
  2. Use the URI to define a Spark UDF to load the MLFlow model.
  3. Call the UDF in your table definitions to use the MLFlow model.

The following example defines a Spark UDF named loaded_model that loads an MLFlow model trained on loan risk data. The loaded_model UDF is then used to define the gtb_scoring_train_data and gtb_scoring_valid_data tables:

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML scored training dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_train_data():
  return dlt.read("train_data")
    .withColumn('predictions', loaded_model(struct(features)))

@dlt.table(
  comment="GBT ML scored valid dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_valid_data():
  return dlt.read("valid_data")
    .withColumn('predictions', loaded_model(struct(features)))

Create sample datasets for development and testing

Scenario

You want to create a sample dataset for development or testing, for example, a dataset containing a subset of data or specific record types.

Solution

Implement your transformation logic in a single or shared set of notebooks. Then create separate notebooks to define multiple datasets based on environment. For example, in production, create a notebook that defines the complete set of data for your pipeline:

CREATE INCREMENTAL LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

Then create notebooks that define a sample of data based on requirements. For example, to generate a small dataset with specific records for testing:

CREATE LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

You can also filter data to create a subset of the production data for development or testing:

CREATE LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

To use these different datasets, create multiple pipelines with the notebooks implementing the transformation logic. Each pipeline can read data from the LIVE.input_data dataset but is configured to include the notebook that creates the dataset specific to the environment.

Programmatically manage and create multiple live tables

Scenario

You have pipelines containing multiple flows or dataset definitions that differ only by a small number of parameters. This redundancy results in pipelines that are error-prone and difficult to maintain. For example, the following diagram shows the graph of a pipeline that uses a fire department dataset to find neighborhoods with the fastest response times for different categories of emergency calls. In this example, the parallel flows differ by only a few parameters.

Fire dataset flow diagram

You can use a metaprogramming pattern to reduce the overhead of generating and maintaining redundant flow definitions. Metaprogramming in Delta Live Tables is done using Python inner functions. Because these functions are lazily evaluated, you can use them to create flows that are identical except for input parameters. Each invocation can include a different set of parameters that controls how each table should be generated, as shown in the following example:

import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )