This tutorial shows you how to use Python syntax to declare a data pipeline in Delta Live Tables. Users familiar with PySpark or Pandas for Spark can use DataFrames with Delta Live Tables. For users unfamiliar with Spark DataFrames, Databricks recommends using SQL for Delta Live Tables. See Tutorial: Declare a data pipeline with SQL in Delta Live Tables.
Python syntax for Delta Live Tables extends standard PySpark with a set of decorator functions imported through the
You cannot mix languages within a Delta Live Tables source code file. You can use multiple notebooks or files with different languages in a pipeline.
To use the code in this example, select
Hive metastoreas the storage option when you create the pipeline. Because this example reads data from DBFS, you cannot run this example with a pipeline configured to use Unity Catalog as the storage option.
You can use notebooks or Python files to write Delta Live Tables Python queries, but Delta Live Tables is not designed to be run interactively in notebook cells.
Delta Live Tables differs from many Python scripts in a key way: you do not call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the
dlt module in all files loaded into a pipeline and builds a dataflow graph.
You cannot rely on the cell-by-cell execution ordering of notebooks when writing Python for Delta Live Tables. Delta Live Tables evaluates and runs all code defined in notebooks, but has an entirely different execution model than a notebook Run all command.
Executing a cell that contains Delta Live Tables syntax in a Databricks notebook results in an error message. To learn about configuring pipelines with Delta Live Tables, see Tutorial: Run your first Delta Live Tables pipeline.
This tutorial demonstrates using Python syntax to declare a Delta Live Tables pipeline on a dataset containing Wikipedia clickstream data to:
Read the raw JSON clickstream data into a table.
Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.
Use the records from the cleansed data table to make Delta Live Tables queries that create derived datasets.
This code demonstrates a simplified example of the medallion architecture. See What is the medallion lakehouse architecture?.
Copy the Python code and paste it into a new Python notebook. You can add the example code to a single cell of the notebook or multiple cells. To review options for creating notebooks, see Create a notebook.
When you create a pipeline with the Python interface, by default, table names are defined by function names. For example, the following Python example creates three tables named
top_spark_referrers. You can override the table name using the
name parameter. See Create a Delta Live Tables materialized view or streaming table.
All Delta Live Tables Python APIs are implemented in the
dlt module. Explicitly import the
dlt module at the top of Python notebooks and files.
The following example shows this import, alongside import statements for
import dlt from pyspark.sql.functions import *
You can define Python variables and functions alongside Delta Live Tables code in notebooks. All Python logic runs as Delta Live Tables resolves the pipeline graph.
The following code declares a text variable used in a later step to load a JSON data file:
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
Delta Live Tables supports loading data from all formats supported by Databricks. See Interact with external data on Databricks.
@dlt.table decorator tells Delta Live Tables to create a table that contains the result of a DataFrame returned by a function. Add the
@dlt.table decorator before any Python function definition that returns a Spark DataFrame to register a new table in Delta Live Tables. The following example demonstrates using the function name as the table name and adding a descriptive comment to the table:
@dlt.table( comment="The raw wikipedia clickstream dataset, ingested from /databricks-datasets." ) def clickstream_raw(): return (spark.read.format("json").load(json_path))
You can use
dlt.read() to read data from other datasets declared in your current Delta Live Tables pipeline. Declaring new tables in this way creates a dependency that Delta Live Tables automatically resolves before executing updates. The following code also includes examples of monitoring and enforcing data quality with expectations. See Manage data quality with Delta Live Tables.
@dlt.table( comment="Wikipedia clickstream data cleaned and prepared for analysis." ) @dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL") @dlt.expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( dlt.read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Because Delta Live Tables processes updates to pipelines as a series of dependency graphs, you can declare highly enriched views that power dashboards, BI, and analytics by declaring tables with specific business logic.
Delta Live Tables tables are equivalent conceptually to materialized views. Whereas traditional views on Spark execute logic each time the view is queried, Delta Live Tables tables store the most recent version of query results in data files. Because Delta Live Tables manages updates for all datasets in a pipeline, you can schedule pipeline updates to match latency requirements for materialized views and know that queries against these tables contain the most recent version of data available.
The table defined by the following code demonstrates the conceptual similarity to a materialized view derived from upstream data in your pipeline:
@dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
To learn more, see Delta Live Tables Python language reference.