Tutorial: Declare a data pipeline with Python in Delta Live Tables
This tutorial shows you how to use Python syntax to declare a data pipeline in Delta Live Tables. Users familiar with PySpark or Pandas for Spark can use DataFrames with Delta Live Tables. For users unfamiliar with Spark DataFrames, Databricks recommends using SQL for Delta Live Tables. See Tutorial: Declare a data pipeline with SQL in Delta Live Tables.
Python syntax for Delta Live Tables extends standard PySpark with a set of decorator functions imported through the
You cannot mix languages within a Delta Live Tables source code file. You can use multiple notebooks or files with different languages in a pipeline.
Where do you run Delta Live Tables Python queries?
You can use notebooks or Python files to write Delta Live Tables Python queries, but Delta Live Tables is not designed to be run interactively in notebook cells.
Delta Live Tables differs from many Python scripts in a key way: you do not call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the
dlt module in all files loaded into a pipeline and builds a dataflow graph.
You cannot rely on the cell-by-cell execution ordering of notebooks when writing Python for Delta Live Tables. Delta Live Tables evaluates and runs all code defined in notebooks, but has an entirely different execution model than a notebook Run all command.
Executing a cell that contains Delta Live Tables syntax in a Databricks notebook results in an error message. To learn about configuring pipelines with Delta Live Tables, see Tutorial: Run your first Delta Live Tables pipeline.
Declare a Delta Live Tables data pipeline with Python
This tutorial demonstrates using Python syntax to declare a Delta Live Tables pipeline on a dataset containing Wikipedia clickstream data to:
Read the raw JSON clickstream data into a table.
Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.
Use the records from the cleansed data table to make Delta Live Tables queries that create derived datasets.
This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.
Copy the Python code and paste it into a new Python notebook. You can add the example code to a single cell of the notebook or multiple cells. To review options for creating notebooks, see Create a notebook.
When you create a pipeline with the Python interface, by default, table names are defined by function names. For example, the following Python example creates three tables named
top_spark_referrers. You can override the table name using the
name parameter. See Create a Delta Live Tables materialized view or streaming table.
Import the Delta Live Tables module
All Delta Live Tables Python APIs are implemented in the
dlt module. Explicitly import the
dlt module at the top of Python notebooks and files.
The following example shows this import, alongside import statements for
import dlt from pyspark.sql.functions import *
Execute arbitrary Python statements
You can define Python variables and functions alongside Delta Live Tables code in notebooks. All Python logic runs as Delta Live Tables resolves the pipeline graph.
The following code declares a text variable used in a later step to load a JSON data file:
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
Create a table from files in object storage
Delta Live Tables supports loading data from all formats supported by Databricks. See Interact with external data on Databricks.
@dlt.table decorator tells Delta Live Tables to create a table that contains the result of a DataFrame returned by a function. Add the
@dlt.table decorator before any Python function definition that returns a Spark DataFrame to register a new table in Delta Live Tables. The following example demonstrates using the function name as the table name and adding a descriptive comment to the table:
@dlt.table( comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets." ) def clickstream_raw(): return (spark.read.format("json").load(json_path))
Add a table from an upstream dataset in the pipeline
You can use
dlt.read() to read data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.
@dlt.table( comment="Wikipedia clickstream data cleaned and prepared for analysis." ) @dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL") @dlt.expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( dlt.read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Create a table with enriched data views
Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.
Delta Live Tables tables are equivalent conceptually to materialized views. Whereas traditional views on Spark execute logic each time the view is queried, Delta Live Tables tables store the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.
The table defined by the following code demonstrates the conceptual similarity to a materialized view derived from upstream data in your pipeline:
@dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Example using other Delta Live Tables options
Delta Live Tables materialized views and streaming tables support other options not shown in the examples above. The following example specifies the schema for the target table, including using Delta Lake generated columns. Partition columns for the target table are also defined.
For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.
@dlt.table( comment="Raw data on sales", schema=""" customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG, order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)) """, partition_cols = ["order_day_of_week"]) def sales(): return ("...")