Delta Live Tables cookbook

This article contains a collection of recommendations and solutions to implement common tasks in your Delta Live Tables pipelines.

Make expectations portable and reusable

Scenario

You want to apply a common set of data quality rules to multiple tables, or the team members that develop and maintain data quality rules are separate from the pipeline developers.

Solution

Maintain data quality rules separately from your pipeline implementations. Store the rules in a format that is reliable and easy to access and update, for example, a text file stored in DBFS or cloud storage or a Delta table. The following example uses a CSV file named rules.csv stored in DBFS to maintain rules. Each rule in rules.csv is categorized by a tag. You use this tag in dataset definitions to determine which rules to apply:

name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained

The following Python example defines data quality expectations based on the rules stored in the rules.csv file. The get_rules() function reads the rules from rules.csv and returns a Python dictionary containing rules matching the tag argument passed to the function. The dictionary is applied in the @dlt.expect_all_*() decorators to enforce data quality constraints. For example, any records failing the rules tagged with validity will be dropped from the raw_farmers_market table:

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from csv file
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Use Python UDFs in SQL

Scenario

You want the simplicity of SQL to define Delta Live Tables datasets but need transformations not directly supported in SQL.

Solution

Use a Python user-defined function (UDF) in your SQL queries. The following example defines and registers the square() UDF to return the square of the input argument and calls the square() UDF in a SQL expression.

  1. Define and register the UDF.

    Create a notebook with Default Language set to Python and add the following in a cell:

    def square(i: int) -> int:
      """
        Simple udf for squaring the parameter passed.
        :param i: column from Pyspark or SQL
        :return: squared value of the passed param.
      """
      return i * i
    
    spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
    
  2. Call the UDF.

    Create a SQL notebook and add the following query in a cell:

    CREATE OR REFRESH LIVE TABLE raw_squared
    AS SELECT makeItSquared(2) AS numSquared;
    
  3. Create a pipeline

    Create a new Delta Live Tables pipeline, adding the notebooks you created to Notebook Libraries. Use the Add notebook library button to add additional notebooks in the Create Pipeline dialog or the libraries field in the Delta Live Tables settings to configure the notebooks.

Use MLflow models in a Delta Live Tables pipeline

Scenario

You want to use an MLflow trained model in a pipeline.

Solution

To use an MLflow model in a Delta Live Tables pipeline:

  1. Obtain the run ID and model name of the MLflow model. The run ID and model name are used to construct the URI of the MLflow model.

  2. Use the URI to define a Spark UDF to load the MLflow model.

  3. Call the UDF in your table definitions to use the MLflow model.

The following example defines a Spark UDF named loaded_model that loads an MLflow model trained on loan risk data. The loaded_model UDF is then used to define the gtb_scoring_train_data and gtb_scoring_valid_data tables:

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML scored training dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_train_data():
  return dlt.read("train_data")
    .withColumn('predictions', loaded_model(struct(features)))

@dlt.table(
  comment="GBT ML scored valid dataset based on Loan Risk",
  table_properties={
    "quality": "gold"
  }
)
def gtb_scoring_valid_data():
  return dlt.read("valid_data")
    .withColumn('predictions', loaded_model(struct(features)))

Create sample datasets for development and testing

Scenario

You want to create a sample dataset for development or testing, for example, a dataset containing a subset of data or specific record types.

Solution

Implement your transformation logic in a single or shared set of notebooks. Then create separate notebooks to define multiple datasets based on environment. For example, in production, create a notebook that defines the complete set of data for your pipeline:

CREATE OR REFRESH STREAMING LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")

Then create notebooks that define a sample of data based on requirements. For example, to generate a small dataset with specific records for testing:

CREATE OR REFRESH LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

You can also filter data to create a subset of the production data for development or testing:

CREATE OR REFRESH LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

To use these different datasets, create multiple pipelines with the notebooks implementing the transformation logic. Each pipeline can read data from the LIVE.input_data dataset but is configured to include the notebook that creates the dataset specific to the environment.

Programmatically manage and create multiple live tables

Scenario

You have pipelines containing multiple flows or dataset definitions that differ only by a small number of parameters. This redundancy results in pipelines that are error-prone and difficult to maintain. For example, the following diagram shows the graph of a pipeline that uses a fire department dataset to find neighborhoods with the fastest response times for different categories of emergency calls. In this example, the parallel flows differ by only a few parameters.

Fire dataset flow diagram

You can use a metaprogramming pattern to reduce the overhead of generating and maintaining redundant flow definitions. Metaprogramming in Delta Live Tables is done using Python inner functions. Because these functions are lazily evaluated, you can use them to create flows that are identical except for input parameters. Each invocation can include a different set of parameters that controls how each table should be generated, as shown in the following example:

import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="top level tables by call type"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="top 10 neighborhoods with fastest response time "
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="which neighbor appears in the best response time list the most"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

Quarantine invalid data

Scenario

You’ve defined expectations to filter out records that violate data quality constraints, but you also want to save the invalid records for analysis.

Solution

Create rules that are the inverse of the expectations you’ve defined and use those rules to save the invalid records to a separate table. You can programmatically create these inverse rules. The following example creates the valid_farmers_market table containing input records that pass the valid_website and valid_location data quality constraints and also creates the invalid_farmers_market table containing the records that fail those data quality constraints:

import dlt

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"

# concatenate inverse rules
quarantine_rules["invalid_record"] = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="valid_farmers_market"
)
@dlt.expect_all_or_drop(rules)
def get_valid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

@dlt.table(
  name="invalid_farmers_market"
)
@dlt.expect_all_or_drop(quarantine_rules)
def get_invalid_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
  )

A disadvantage of the above approach is that it generates the quarantine table by processing the data twice. If you don’t want this performance overhead, you can use the constraints directly within a query to generate a column indicating the validation status of a record. You can then partition the table by this column for further optimization.

This approach does not use expectations, so data quality metrics do not appear in the event logs or the pipelines UI.

import dlt
from pyspark.sql.functions import expr

rules = {}
quarantine_rules = {}

rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="partitioned_farmers_market",
  partition_cols = [ 'Quarantine' ]
)
def get_partitioned_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .withColumn("Quarantine", expr(quarantine_rules))
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime",
              "Quarantine")
  )

Validate row counts across tables

Scenario

You need to compare row counts between two live tables, perhaps to verify that data was processed successfully without dropping rows.

Solution

Add an additional table to your pipeline that defines an expectation to perform the comparison. The results of this expectation appear in the event log and the Delta Live Tables UI. This example validates equal row counts between the tbla and tblb tables:

CREATE OR REFRESH LIVE TABLE count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Retain manual deletes or updates

Scenario

Your application requires arbitrary deletion or updates of records in a table and recomputation of all downstream tables. The following diagram illustrates two streaming live tables:

  • raw_user_table ingests a set of raw user data from a source.

  • bmi_table incrementally computes BMI scores using weight and height from raw_user_table.

To comply with privacy requirements, you need to delete or update a user record from the raw_user_table and recompute the bmi_table.

Retain data diagram

You can manually delete or update the record from raw_user_table and do a refresh operation to recompute the downstream tables. However, you need to make sure the deleted record isn’t reloaded from the source data.

Solution

Use the pipelines.reset.allowed table property to disable full refresh for raw_user_table so that intended changes are retained over time:

CREATE OR REFRESH STREAMING LIVE TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING LIVE TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);

Setting pipelines.reset.allowed to false prevents refreshes to raw_user_table, but does not prevent incremental writes to the tables or prevent new data from flowing into the table.

Exclude tables from publishing

Scenario

You’ve configured the target setting to publish your tables, but there are some tables you don’t want published.

Solution

Define a temporary table to instruct Delta Live Tables not to persist metadata for the table:

CREATE TEMPORARY LIVE TABLE customers_raw
AS SELECT * FROM json.`/data/customers/json/`
@dlt.table(
  comment="Raw customer data",
  temporary=True)
def customers_raw():
  return ("...")

Use secrets in a pipeline

Scenario

You need to authenticate to a data source from your pipeline, for example, cloud data storage or a database, and don’t want to include credentials in your notebook or configuration.

Solution

Use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration.

The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.

To learn more about working with Azure Data Lake Storage Gen2, see Access Azure Data Lake Storage Gen2 and Blob Storage.

Note

You must add the spark.hadoop. prefix to the spark_conf configuration key that sets the secret value.

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "label": "default",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      },
      {
        "label": "maintenance",
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> with the ADLS Gen2 storage account name.

  • <scope-name> with the Databricks secret scope name.

  • <secret-name> with the name of the key containing the Azure storage account access key.

import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

Replace

  • <container-name> with the name of the Azure storage account container that stores the input data.

  • <storage-account-name> with the ADLS Gen2 storage account name.

  • <path-to-input-dataset> with the path to the input dataset.

Define limits on pipeline clusters

Scenario

You want to restrict configuration options for clusters that run Delta Live Tables pipelines; for example, you want to control costs by limiting cluster size or simplify cluster configuration by providing pre-defined cluster templates.

Solution

Cluster policies allow you to define templates that limit user access to cluster configuration. You can define one or more cluster policies to use when configuring pipelines.

To create a cluster policy for Delta Live Tables pipelines, define a cluster policy with the cluster_type field set to dlt. The following example creates a minimal policy for a Delta Live Tables cluster:

{
  "cluster_type": {
    "type": "fixed",
    "value": "dlt"
  },
  "num_workers": {
    "type": "unlimited",
    "defaultValue": 3,
    "isOptional": true
  },
  "node_type_id": {
    "type": "unlimited",
    "isOptional": true
  },
  "spark_version": {
    "type": "unlimited",
    "hidden": true
  }
}

For more information on creating cluster policies, including example policies, see Create a cluster policy.

To use a cluster policy in a pipeline configuration, you need the policy ID. To find the policy ID:

  1. Click compute icon Compute in the sidebar.

  2. Click the Cluster Policies tab.

  3. Click the policy name.

  4. Copy the policy ID in the ID field.

The policy ID is also available at the end of the URL when you click on the policy name.

To use the cluster policy with a pipeline:

Note

When using cluster policies to configure Delta Live Tables clusters, Databricks recommends applying a single policy to both the default and maintenance clusters.

  1. Click Jobs Icon Workflows in the sidebar and click the Delta Live Tables tab. The Pipelines list displays.

  2. Click the pipeline name. The Pipeline details page appears.

  3. Click the Settings button. The Edit Pipeline Settings dialog appears.

  4. Click the JSON button.

  5. In the clusters setting, set the policy_id field to the value of the policy ID. The following example configures the default and maintenance clusters using the cluster policy with the ID C65B864F02000008:

    {
      "clusters": [
        {
          "label": "default",
          "policy_id": "C65B864F02000008",
          "autoscale": {
            "min_workers": 1,
            "max_workers": 5,
            "mode": "ENHANCED"
          }
        },
        {
          "label": "maintenance",
          "policy_id": "C65B864F02000008"
        }
      ]
    }
    
  6. Click Save.

Perform advanced validation with Delta Live Tables expectations

Scenario

You need to perform complex data quality checks, for example, ensuring a derived table contains all records from the source table or guaranteeing the equality of a numeric column across tables.

Solution

You can define live tables using aggregate and join queries and use the results of those queries as part of your expectation checking. The following example validates that all expected records are present in the report table:

CREATE LIVE TABLE report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

The following example uses an aggregate to ensure the uniqueness of a primary key:

CREATE LIVE TABLE report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

To prevent persistence of the table used in the validation, use the TEMPORARY keyword in the table definition.

Use Azure Event Hubs as a data source

Scenario

Your pipeline needs to process messages from Azure Event Hubs. However, you cannot use the Structured Streaming Event Hubs connector because this library is not available as part of Databricks Runtime, and Delta Live Tables does not allow you to use third-party JVM libraries.

Solution

Azure Event Hubs provides an endpoint compatible with Apache Kafka that you can use with the Structured Streaming Kafka connector, available in Databricks Runtime, to process messages from Azure Event Hubs. For more information about Azure Event Hubs and Apache Kafka compatibility, see Use Azure Event Hubs from Apache Kafka applications.

The following steps describe connecting a Delta Live Tables pipeline to an existing Event Hubs instance and consuming events from a topic. To complete these steps, you need the following Event Hubs connection values:

  • The name of the Event Hubs namespace.

  • The name of the Event Hub instance in the Event Hubs namespace.

  • A shared access policy name and policy key for Event Hubs. By default, A RootManageSharedAccessKey policy is created for each Event Hubs namespace. This policy has manage, send and listen permissions. If your pipeline only reads from Event Hubs, Databricks recommends creating a new policy with listen permission only.

For more information about the Event Hubs connection string, see Get an Event Hubs connection string.

Note

  • Azure Event Hubs provides both OAuth 2.0 and shared access signature (SAS) options to authorize access to your secure resources. These instructions use SAS-based authentication.

  • If you get the Event Hubs connection string from the Azure portal, it may not contain the EntityPath value. The EntityPath value is required only when using the Structured Streaming Event Hub connector. Using the Structured Streaming Kafka Connector requires providing only the topic name.

Store the policy key in a Databricks secret

Because the policy key is sensitive information, Databricks recommends not hardcoding the value in your pipeline code. Instead, use Databricks secrets to store and manage access to the key.

The following example uses the Databricks CLI to create a secret scope and store the key in that secret scope. In your pipeline code, use the dbutils.secrets.get() function with the scope-name and shared-policy-name to retrieve the key value.

databricks --profile <profile-name> secrets create-scope --scope <scope-name>

databricks --profile <profile-name> secrets put --scope <scope-name> --key <shared-policy-name> --string-value <shared-policy-key>

For more information on Databricks secrets, see Secret management. For more information on using the Secrets CLI, see Secrets CLI.

Create a notebook and add the pipeline code to consume events

The following example reads IoT events from a topic, but you can adapt the example for the requirements of your application. As a best practice, Databricks recommends using the Delta Live Tables pipeline settings to configure application variables. Your pipeline code then uses the spark.conf.get() function to retrieve values. For more information on using pipeline settings to parameterize your pipeline, see Parameterize pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Create the pipeline

Create a new pipeline with the following settings, replacing the placeholder values with appropriate values for your environment.

{
  "clusters": [
    {
      "label": "default",
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> with the name of an Azure storage account container.

  • <storage-account-name> with the name of an ADLS Gen2 storage account.

  • <eh-namespace> with the name of your Event Hubs namespace.

  • <eh-policy-name> with the secret scope key for the Event Hubs policy key.

  • <eventhub> with the name of your Event Hubs instance.

  • <secret-scope-name> with the name of the Databricks secret scope that contains the Event Hubs policy key.

As a best practice, this pipeline doesn’t use the default DBFS storage path but instead uses an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account. For more information on configuring authentication for an ADLS Gen2 storage account, see Use secrets in a pipeline.

Import Python modules from a Databricks repo

Scenario

You want to store and maintain your Python code in a common location, for example, to access code stored in source control or code that you want shared across pipelines.

Solution

Use Databricks Repos to store your Python code as modules or packages. You can then import the Python code in your pipeline notebooks. For more information about managing files in a Databricks repo, see Work with Python and R modules.

The following example adapts the example from the Delta Live Tables quickstart by importing dataset queries as a Python module from a repo. To run this example, use the following steps:

  1. To create a repo for your Python code, click Repos Icon Repos in the sidebar and click Add Repo.

  2. Deselect Create repo by cloning a Git repository and enter a name for the repo in Repository name, for example, dlt-quickstart-repo.

  3. Create a module to read source data into a table: click the down arrow next to the repo name, select Create > File, and enter a name for the file, for example, clickstream_raw_module.py. The file editor opens. Enter the following in the editor window:

    from dlt import *
    
    json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
    
    def create_clickstream_raw_table(spark):
      @table
      def clickstream_raw():
        return (
          spark.read.json(json_path)
        )
    
  4. Create a module to create a new table containing prepared data: select Create > File again and enter a name for the file, for example, clickstream_prepared_module.py. Enter the following in the new editor window:

    from clickstream_raw_module import *
    from dlt import read
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    def create_clickstream_prepared_table(spark):
      create_clickstream_raw_table(spark)
      @table
      @expect("valid_current_page_title", "current_page_title IS NOT NULL")
      @expect_or_fail("valid_count", "click_count > 0")
      def clickstream_prepared():
        return (
          read("clickstream_raw")
            .withColumn("click_count", expr("CAST(n AS INT)"))
            .withColumnRenamed("curr_title", "current_page_title")
            .withColumnRenamed("prev_title", "previous_page_title")
            .select("current_page_title", "click_count", "previous_page_title")
        )
    
  5. Create a pipeline notebook: go to your Databricks landing page and select Create a notebook, or click New Icon New in the sidebar and select Notebook. The Create Notebook dialog appears. You can also create the notebook in the repo by clicking the down arrow next to the repo name and selecting Create > Notebook.

  6. In the Create Notebook dialogue, give your notebook a name and select Python from the Default Language dropdown menu. You can leave Cluster set to the default value.

  7. Click Create.

  8. Enter the example code in the notebook.

    If you created the notebook in the workspace or a repo path that’s different from the Python modules path, enter the following code in the first cell of the notebook:

    import sys, os
    sys.path.append(os.path.abspath('<repo-path>'))
    
    import dlt
    from clickstream_prepared_module import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    create_clickstream_prepared_table(spark)
    
    @dlt.table(
      comment="A table containing the top pages linking to the Apache Spark page."
    )
    def top_spark_referrers():
      return (
        dlt.read("clickstream_prepared")
          .filter(expr("current_page_title == 'Apache_Spark'"))
          .withColumnRenamed("previous_page_title", "referrer")
          .sort(desc("click_count"))
          .select("referrer", "click_count")
          .limit(10)
      )
    

    Replace <repo-path> with the path to the Databricks repo containing the Python modules to import.

    If you created your pipeline notebook in the same repo as the modules you’re importing, you do not need to specify the repo path with sys.path.append. Enter the following code in the first cell of the notebook:

       import sys, os
    
       import dlt
       from clickstream_prepared_module import *
       from pyspark.sql.functions import *
       from pyspark.sql.types import *
    
       create_clickstream_prepared_table(spark)
    
       @dlt.table(
         comment="A table containing the top pages linking to the Apache Spark page."
       )
       def top_spark_referrers():
         return (
           dlt.read("clickstream_prepared")
             .filter(expr("current_page_title == 'Apache_Spark'"))
             .withColumnRenamed("previous_page_title", "referrer")
             .sort(desc("click_count"))
             .select("referrer", "click_count")
             .limit(10)
         )
    
  9. Create a pipeline using the new notebook.

  10. To run the pipeline, in the Pipeline details page, click Start.

You can also import Python code as a package. The following code snippet from a Delta Live Tables notebook imports the test_utils package from the dlt_packages directory inside the same repo root as the notebook. The dlt_packages directory contains the files test_utils.py and __init__.py, and test_utils.py defines the function create_test_table():

import dlt

@dlt.table
def my_table():
  return dlt.read(...)

# ...

import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)