Tutorial: Run your first Delta Live Tables pipeline
This tutorial shows you how to configure a Delta Live Tables pipeline from code in a Databricks notebook and run the pipeline by triggering a pipeline update. This tutorial includes an example pipeline to ingest and process a sample dataset with example code using the Python and SQL interfaces. You can also use the instructions in this tutorial to create a pipeline with any notebooks with properly-defined Delta Live Tables syntax.
You can configure Delta Live Tables pipelines and trigger updates using the Databricks workspace UI or automated tooling options such as the API, CLI, Databricks Asset Bundles, or as a task in a Databricks workflow. To familiarize yourself with the functionality and features of Delta Live Tables, Databricks recommends first using the UI to create and run pipelines. Additionally, when you configure a pipeline in the UI, Delta Live Tables generates a JSON configuration for your pipeline that can be used to implement your programmatic workflows.
To demonstrate Delta Live Tables functionality, the examples in this tutorial download a publicly available dataset. However, Databricks has several ways to connect to data sources and ingest data that pipelines implementing real-world use cases will use. See Ingest data with Delta Live Tables.
Requirements
To start a pipeline, you must have cluster creation permission or access to a cluster policy defining a Delta Live Tables cluster. The Delta Live Tables runtime creates a cluster before it runs your pipeline and fails if you don’t have the correct permission.
To use the examples in this tutorial, your workspace must have Unity Catalog enabled.
You must have the following permissions in Unity Catalog:
READ VOLUME
andWRITE VOLUME
, orALL PRIVILEGES
, for themy-volume
volume.USE SCHEMA
orALL PRIVILEGES
for thedefault
schema.USE CATALOG
orALL PRIVILEGES
for themain
catalog.
To set these permissions, see your Databricks administrator or Unity Catalog privileges and securable objects.
The examples in this tutorial use a Unity Catalog volume to store sample data. To use these examples, create a volume and use that volume’s catalog, schema, and volume names to set the volume path used by the examples.
Note
If your workspace does not have Unity Catalog enabled, notebooks with examples that do not require Unity Catalog are attached to this article. To use these examples, select Hive metastore
as the storage option when you create the pipeline.
Where do you run Delta Live Tables queries?
Delta Live Tables queries are primarily implemented in Databricks notebooks, but Delta Live Tables is not designed to be run interactively in notebook cells. Executing a cell that contains Delta Live Tables syntax in a Databricks notebook results in an error message. To run your queries, you must configure your notebooks as part of a pipeline.
Important
You cannot rely on the cell-by-cell execution ordering of notebooks when writing queries for Delta Live Tables. Delta Live Tables evaluates and runs all code defined in notebooks but has a different execution model than a notebook Run all command.
You cannot mix languages in a single Delta Live Tables source code file. For example, a notebook can contain only Python queries or SQL queries. If you must use multiple languages in a pipeline, use multiple language-specific notebooks or files in the pipeline.
You can also use Python code stored in files. For example, you can create a Python module that can be imported into your Python pipelines or define Python user-defined functions (UDFs) to use in SQL queries. To learn about importing Python modules, see Import Python modules from Git folders or workspace files. To learn about using Python UDFs, see User-defined scalar functions - Python.
Example: Ingest and process New York baby names data
The example in this article uses a publicly available dataset that contains records of New York State baby names. These examples demonstrate using a Delta Live Tables pipeline to:
Read raw CSV data from a publicly available dataset into a table.
Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.
Use the cleansed records as input to Delta Live Tables queries that create derived datasets.
This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.
Implementations of this example are provided for the Python and SQL interfaces. You can follow the steps to create new notebooks that contain the example code, or you can skip ahead to Create a pipeline and use one of the notebooks provided on this page.
Implement a Delta Live Tables pipeline with Python
Python code that creates Delta Live Tables datasets must return DataFrames. For users unfamiliar with Python and DataFrames, Databricks recommends using the SQL interface. See Implement a Delta Live Tables pipeline with SQL.
All Delta Live Tables Python APIs are implemented in the dlt
module. Your Delta Live Tables pipeline code implemented with Python must explicitly import the dlt
module at the top of Python notebooks and files. Delta Live Tables differs from many Python scripts in a key way: you do not call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the dlt
module in all files loaded into a pipeline and builds a dataflow graph.
To implement the example in this tutorial, copy and paste the following Python code into a new Python notebook. Add each example code snippet to its own cell in the notebook in the order described. To review options for creating notebooks, see Create a notebook.
When you create a pipeline with the Python interface, by default, table names are defined by function names. For example, the following Python example creates three tables named baby_names_raw
, baby_names_prepared
, and top_baby_names_2021
. You can override the table name using the name
parameter. See Create a Delta Live Tables materialized view or streaming table.
Important
To avoid unexpected behavior when your pipeline runs, do not include code that might have side effects in your functions that define datasets. To learn more, see the Python reference.
Import the Delta Live Tables module
All Delta Live Tables Python APIs are implemented in the dlt
module. Explicitly import the dlt
module at the top of Python notebooks and files.
The following example shows this import, alongside import statements for pyspark.sql.functions
.
import dlt
from pyspark.sql.functions import *
Download the data
To get the data for this example, you download a CSV file and store it in the volume as follows:
import os
os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"
dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")
Replace <catalog-name>
, <schema-name>
, and <volume-name>
with the catalog, schema, and volume names for a Unity Catalog volume.
Create a table from files in object storage
Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.
The @dlt.table
decorator tells Delta Live Tables to create a table that contains the result of a DataFrame
returned by a function. Add the @dlt.table
decorator before any Python function definition that returns a Spark DataFrame to register a new table in Delta Live Tables. The following example demonstrates using the function name as the table name and adding a descriptive comment to the table:
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
Add a table from an upstream dataset in the pipeline
You can use dlt.read()
to read data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
dlt.read("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
Create a table with enriched data views
Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.
Tables in Delta Live Tables are equivalent conceptually to materialized views. Unlike traditional views on Spark that run logic each time the view is queried, a Delta Live Tables table stores the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.
The table defined by the following code demonstrates the conceptual similarity to a materialized view derived from upstream data in your pipeline:
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
dlt.read("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
To configure a pipeline that uses the notebook, see Create a pipeline.
Implement a Delta Live Tables pipeline with SQL
Databricks recommends Delta Live Tables with SQL as the preferred way for SQL users to build new ETL, ingestion, and transformation pipelines on Databricks. The SQL interface for Delta Live Tables extends standard Spark SQL with many new keywords, constructs, and table-valued functions. These additions to standard SQL allow users to declare dependencies between datasets and deploy production-grade infrastructure without learning new tooling or additional concepts.
For users familiar with Spark DataFrames and who need support for more extensive testing and operations that are difficult to implement with SQL, such as metaprogramming operations, Databricks recommends using the Python interface. See Implement a Delta Live Tables pipeline with Python.
Download the data
To get the data for this example, copy the following code, paste it into a new notebook, and then run the notebook. To review options for creating notebooks, see Create a notebook.
%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
Replace <catalog-name>
, <schema-name>
, and <volume-name>
with the catalog, schema, and volume names for a Unity Catalog volume.
Create a table from files in Unity Catalog
For the rest of this example, copy the following SQL snippets and paste them into a new SQL notebook, separate from the notebook in the previous section. Add each example SQL snippet to its own cell in the notebook in the order described.
Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.
All Delta Live Tables SQL statements use CREATE OR REFRESH
syntax and semantics. When you update a pipeline, Delta Live Tables determines whether the logically correct result for the table can be accomplished through incremental processing or if full recomputation is required.
The following example creates a table by loading data from the CSV file stored in the Unity Catalog volume:
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
'/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Replace <catalog-name>
, <schema-name>
, and <volume-name>
with the catalog, schema, and volume names for a Unity Catalog volume.
Add a table from an upstream dataset to the pipeline
You can use the live
virtual schema to query data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The live
schema is a custom keyword implemented in Delta Live Tables that can be substituted for a target schema if you want to publish your datasets. See Use Unity Catalog with your Delta Live Tables pipelines and Use Delta Live Tables pipelines with legacy Hive metastore.
The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM live.baby_names_sql_raw;
Create an enriched data view
Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.
The following query uses a materialized view to create an enriched view from the upstream data. Unlike traditional views on Spark that run logic each time the view is queried, materialized views store the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
To configure a pipeline that uses the notebook, continue to Create a pipeline.
Create a pipeline
Note
Because compute resources are fully managed for serverless DLT pipelines, compute settings are not available when you select Serverless for a pipeline.
For information on eligibility and enablement for serverless DLT pipelines, see Enable serverless compute.
Delta Live Tables creates pipelines by resolving dependencies defined in notebooks or files (called source code) using Delta Live Tables syntax. Each source code file can only contain one language, but you can mix source code of different languages in your pipeline.
Click Delta Live Tables in the sidebar and click Create Pipeline.
Give the pipeline a name.
(Optional) To run your pipeline using serverless DLT pipelines, select the Serverless checkbox. When you select Serverless, the Compute settings are removed from the UI and the Budget policy setting appears. See Configure a serverless Delta Live Tables pipeline.
(Optional) Select a product edition.
Select Triggered for Pipeline Mode.
Configure one or more notebooks containing the source code for the pipeline. In the Paths textbox, enter the path to a notebook or click to select a notebook.
Select a destination for datasets published by the pipeline, either the Hive metastore or Unity Catalog. See Publish datasets.
Hive metastore:
(Optional) Enter a Storage location for output data from the pipeline. The system uses a default location if you leave Storage location empty.
(Optional) Specify a Target schema to publish your dataset to the Hive metastore.
Unity Catalog: Specify a Catalog and a Target schema to publish your dataset to Unity Catalog.
(Optional) If you have not selected Serverless, you can configure compute settings for the pipeline. To learn about options for compute settings, see Configure compute for a Delta Live Tables pipeline.
(Optional) Click Add notification to configure one or more email addresses to receive notifications for pipeline events. See Add email notifications for pipeline events.
(Optional) Configure advanced settings for the pipeline. To learn about options for advanced settings, see Configure a Delta Live Tables pipeline.
Click Create.
The Pipeline Details page appears after you click Create. You can also access your pipeline by clicking the pipeline name in the Delta Live Tables tab.
Start a pipeline update
To start an update for a pipeline, click the button in the top panel. The system returns a message confirming that your pipeline is starting.
After successfully starting the update, the Delta Live Tables system:
Starts a cluster using a cluster configuration created by the Delta Live Tables system. You can also specify a custom cluster configuration.
Creates any tables that don’t exist and ensures that the schema is correct for any existing tables.
Updates tables with the latest data available.
Shuts down the cluster when the update is complete.
Note
Execution mode is set to Production by default, which deploys ephemeral compute resources for each update. You can use Development mode to change this behavior, allowing the same compute resources to be used for multiple pipeline updates during development and testing. See Development and production modes.
Publish datasets
You can make Delta Live Tables datasets available for querying by publishing tables to the Hive metastore or Unity Catalog. If you do not specify a target for publishing data, tables created in Delta Live Tables pipelines can only be accessed by other operations in that same pipeline. See Use Delta Live Tables pipelines with legacy Hive metastore and Use Unity Catalog with your Delta Live Tables pipelines.
Example source code notebooks
You can import these notebooks into a Databricks workspace and use them to deploy a Delta Live Tables pipeline. See Create a pipeline.
Example source code notebooks for workspaces without Unity Catalog
You can import these notebooks into a Databricks workspace without Unity Catalog enabled and use them to deploy a Delta Live Tables pipeline. See Create a pipeline.