Tutorial: Run your first Delta Live Tables pipeline

This tutorial shows you how to configure a Delta Live Tables pipeline from code in a Databricks notebook and run the pipeline by triggering a pipeline update. This tutorial includes an example pipeline to ingest and process a sample dataset with example code using the Python and SQL interfaces. You can also use the instructions in this tutorial to create a pipeline with any notebooks with properly-defined Delta Live Tables syntax.

You can configure Delta Live Tables pipelines and trigger updates using the Databricks workspace UI or automated tooling options such as the API, CLI, Databricks Asset Bundles, or as a task in a Databricks workflow. To familiarize yourself with the functionality and features of Delta Live Tables, Databricks recommends first using the UI to create and run pipelines. Additionally, when you configure a pipeline in the UI, Delta Live Tables generates a JSON configuration for your pipeline that can be used to implement your programmatic workflows.

To demonstrate Delta Live Tables functionality, the examples in this tutorial download a publicly available dataset. However, Databricks has several ways to connect to data sources and ingest data that pipelines implementing real-world use cases will use. See Ingest data with Delta Live Tables.

Requirements

  • To start a pipeline, you must have cluster creation permission or access to a cluster policy defining a Delta Live Tables cluster. The Delta Live Tables runtime creates a cluster before it runs your pipeline and fails if you don’t have the correct permission.

  • To use the examples in this tutorial, your workspace must have Unity Catalog enabled.

  • You must have the following permissions in Unity Catalog:

    • READ VOLUME and WRITE VOLUME, or ALL PRIVILEGES, for the my-volume volume.

    • USE SCHEMA or ALL PRIVILEGES for the default schema.

    • USE CATALOG or ALL PRIVILEGES for the main catalog.

    To set these permissions, see your Databricks administrator or Unity Catalog privileges and securable objects.

  • The examples in this tutorial use a Unity Catalog volume to store sample data. To use these examples, create a volume and use that volume’s catalog, schema, and volume names to set the volume path used by the examples.

Note

If your workspace does not have Unity Catalog enabled, notebooks with examples that do not require Unity Catalog are attached to this article. To use these examples, select Hive metastore as the storage option when you create the pipeline.

Where do you run Delta Live Tables queries?

Delta Live Tables queries are primarily implemented in Databricks notebooks, but Delta Live Tables is not designed to be run interactively in notebook cells. Executing a cell that contains Delta Live Tables syntax in a Databricks notebook results in an error message. To run your queries, you must configure your notebooks as part of a pipeline.

Important

  • You cannot rely on the cell-by-cell execution ordering of notebooks when writing queries for Delta Live Tables. Delta Live Tables evaluates and runs all code defined in notebooks but has a different execution model than a notebook Run all command.

  • You cannot mix languages in a single Delta Live Tables source code file. For example, a notebook can contain only Python queries or SQL queries. If you must use multiple languages in a pipeline, use multiple language-specific notebooks or files in the pipeline.

You can also use Python code stored in files. For example, you can create a Python module that can be imported into your Python pipelines or define Python user-defined functions (UDFs) to use in SQL queries. To learn about importing Python modules, see Import Python modules from Git folders or workspace files. To learn about using Python UDFs, see User-defined scalar functions - Python.

Example: Ingest and process New York baby names data

The example in this article uses a publicly available dataset that contains records of New York State baby names. These examples demonstrate using a Delta Live Tables pipeline to:

  • Read raw CSV data from a publicly available dataset into a table.

  • Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.

  • Use the cleansed records as input to Delta Live Tables queries that create derived datasets.

This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.

Implementations of this example are provided for the Python and SQL interfaces. You can follow the steps to create new notebooks that contain the example code, or you can skip ahead to Create a pipeline and use one of the notebooks provided on this page.

Implement a Delta Live Tables pipeline with Python

Python code that creates Delta Live Tables datasets must return DataFrames, familiar to users with PySpark or Pandas for Spark experience. For users unfamiliar with DataFrames, Databricks recommends using the SQL interface. See Implement a Delta Live Tables pipeline with SQL.

All Delta Live Tables Python APIs are implemented in the dlt module. Your Delta Live Tables pipeline code implemented with Python must explicitly import the dlt module at the top of Python notebooks and files. Delta Live Tables differs from many Python scripts in a key way: you do not call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the dlt module in all files loaded into a pipeline and builds a dataflow graph.

To implement the example in this tutorial, copy and paste the following Python code into a new Python notebook. You should add each example code snippet to its own cell in the notebook in the order described. To review options for creating notebooks, see Create a notebook.

Note

When you create a pipeline with the Python interface, by default, table names are defined by function names. For example, the following Python example creates three tables named baby_names_raw, baby_names_prepared, and top_baby_names_2021. You can override the table name using the name parameter. See Create a Delta Live Tables materialized view or streaming table.

Import the Delta Live Tables module

All Delta Live Tables Python APIs are implemented in the dlt module. Explicitly import the dlt module at the top of Python notebooks and files.

The following example shows this import, alongside import statements for pyspark.sql.functions.

import dlt
from pyspark.sql.functions import *

Download the data

To get the data for this example, you download a CSV file and store it in the volume as follows:

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names for a Unity Catalog volume.

Create a table from files in object storage

Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.

The @dlt.table decorator tells Delta Live Tables to create a table that contains the result of a DataFrame returned by a function. Add the @dlt.table decorator before any Python function definition that returns a Spark DataFrame to register a new table in Delta Live Tables. The following example demonstrates using the function name as the table name and adding a descriptive comment to the table:

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

Add a table from an upstream dataset in the pipeline

You can use dlt.read() to read data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

Create a table with enriched data views

Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.

Tables in Delta Live Tables are equivalent conceptually to materialized views. Whereas traditional views on Spark run logic each time the view is queried, a Delta Live Tables table stores the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.

The table defined by the following code demonstrates the conceptual similarity to a materialized view derived from upstream data in your pipeline:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

To configure a pipeline that uses the notebook, see Create a pipeline.

Implement a Delta Live Tables pipeline with SQL

Databricks recommends Delta Live Tables with SQL as the preferred way for SQL users to build new ETL, ingestion, and transformation pipelines on Databricks. The SQL interface for Delta Live Tables extends standard Spark SQL with many new keywords, constructs, and table-valued functions. These additions to standard SQL allow users to declare dependencies between datasets and deploy production-grade infrastructure without learning new tooling or additional concepts.

For users familiar with Spark DataFrames and who need support for more extensive testing and operations that are difficult to implement with SQL, such as metaprogramming operations, Databricks recommends using the Python interface. See Example: Ingest and process New York baby names data.

Download the data

To get the data for this example, copy the following code, paste it into a new notebook, and then run the notebook. To review options for creating notebooks, see Create a notebook.

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names for a Unity Catalog volume.

Create a table from files in Unity Catalog

For the rest of this example, copy the following SQL snippets and paste them into a new SQL notebook, separate from the notebook in the previous section. You should add each example SQL snippet to its own cell in the notebook in the order described.

Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.

All Delta Live Tables SQL statements use CREATE OR REFRESH syntax and semantics. When you update a pipeline, Delta Live Tables determines whether the logically correct result for the table can be accomplished through incremental processing or if full recomputation is required.

The following example creates a table by loading data from the CSV file stored in the Unity Catalog volume:

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names for a Unity Catalog volume.

Add a table from an upstream dataset to the pipeline

You can use the live virtual schema to query data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The live schema is a custom keyword implemented in Delta Live Tables that can be substituted for a target schema if you want to publish your datasets. See Use Unity Catalog with your Delta Live Tables pipelines and Publish data from Delta Live Tables to the Hive metastore.

The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

Create an enriched data view

Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.

Live tables are equivalent conceptually to materialized views. Whereas traditional views on Spark run logic each time the view is queried, live tables store the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.

The following code creates an enriched materialized view of upstream data:

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

To configure a pipeline that uses the notebook, continue to Create a pipeline.

Create a pipeline

Delta Live Tables creates pipelines by resolving dependencies defined in notebooks or files (called source code or libraries) using Delta Live Tables syntax. Each source code file can only contain one language, but you can mix libraries of different languages in your pipeline.

  1. Click Delta Live Tables in the sidebar and click Create Pipeline.

  2. Give the pipeline a name.

  3. (Optional) Select a product edition.

  4. Select Triggered for Pipeline Mode.

  5. Configure one or more notebooks containing the source code for the pipeline. In the Paths textbox, enter the path to a notebook or click File Picker Icon to select a notebook.

  6. Select a destination for datasets published by the pipeline, either the Hive metastore or Unity Catalog. See Publish datasets.

    • Hive metastore:

      • (Optional) Enter a Storage location for output data from the pipeline. The system uses a default location if you leave Storage location empty.

      • (Optional) Specify a Target schema to publish your dataset to the Hive metastore.

    • Unity Catalog: Specify a Catalog and a Target schema to publish your dataset to Unity Catalog.

  7. (Optional) Configure compute settings for the pipeline. To learn about options for compute settings, see Configure pipeline settings for Delta Live Tables.

  8. (Optional) Click Add notification to configure one or more email addresses to receive notifications for pipeline events. See Add email notifications for pipeline events.

  9. (Optional) Configure advanced settings for the pipeline. To learn about options for advanced settings, see Configure pipeline settings for Delta Live Tables.

  10. Click Create.

The system displays the Pipeline Details page after you click Create. You can also access your pipeline by clicking the pipeline name in the Delta Live Tables tab.

Start a pipeline update

To start an update for a pipeline, click the Delta Live Tables Start Icon button in the top panel. The system returns a message confirming that your pipeline is starting.

After successfully starting the update, the Delta Live Tables system:

  1. Starts a cluster using a cluster configuration created by the Delta Live Tables system. You can also specify a custom cluster configuration.

  2. Creates any tables that don’t exist and ensures that the schema is correct for any existing tables.

  3. Updates tables with the latest data available.

  4. Shuts down the cluster when the update is complete.

Note

Execution mode is set to Production by default, which deploys ephemeral compute resources for each update. You can use Development mode to change this behavior, allowing the same compute resources to be used for multiple pipeline updates during development and testing. See Development and production modes.

Publish datasets

You can make Delta Live Tables datasets available for querying by publishing tables to the Hive metastore or Unity Catalog. If you do not specify a target for publishing data, tables created in Delta Live Tables pipelines can only be accessed by other operations in that same pipeline. See Publish data from Delta Live Tables to the Hive metastore and Use Unity Catalog with your Delta Live Tables pipelines.

Example source code notebooks

You can import these notebooks into a Databricks workspace and use them to deploy a Delta Live Tables pipeline. See Create a pipeline.

Get started with Delta Live Tables Python notebook

Open notebook in new tab

Get started with Delta Live Tables SQL notebook

Open notebook in new tab

Example source code notebooks for workspaces without Unity Catalog

You can import these notebooks into a Databricks workspace without Unity Catalog enabled and use them to deploy a Delta Live Tables pipeline. See Create a pipeline.

Get started with Delta Live Tables Python notebook

Open notebook in new tab

Get started with Delta Live Tables SQL notebook

Open notebook in new tab