Programmatically create multiple tables

You can use Python with Delta Live Tables to programmatically create multiple tables to reduce code redundancy.

You might have pipelines containing multiple flows or dataset definitions that differ only by a small number of parameters. This redundancy results in pipelines that are error-prone and difficult to maintain. For example, the following diagram shows the graph of a pipeline that uses a fire department dataset to find neighborhoods with the fastest response times for different categories of emergency calls. In this example, the parallel flows differ by only a few parameters.

Fire dataset flow diagram

Delta Live Tables metaprogramming with Python example

You can use a metaprogramming pattern to reduce the overhead of generating and maintaining redundant flow definitions. Metaprogramming in Delta Live Tables is done using Python inner functions. Because these functions are lazily evaluated, you can use them to create flows that are identical except for input parameters. Each invocation can include a different set of parameters that controls how each table should be generated, as shown in the following example:

import dlt
from pyspark.sql.functions import *

  comment="raw table for fire department response"
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return ('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')

all_tables = []

def generate_tables(call_table, response_table, filter):
    comment="top level tables by call type"
  def create_call_table():
    return (
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'

    comment="top 10 neighborhoods with fastest response time "
  def create_response_table():
    return (
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10


generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

  comment="which neighbor appears in the best response time list the most"
def summary():
  target_tables = [ for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (