Develop pipeline code with Python
Delta Live Tables introduces several new Python code constructs for defining materialized views and streaming tables in pipelines. Python support for developing pipelines builds upon the basics of PySpark DataFrame and Structured Streaming APIs.
For users unfamiliar with Python and DataFrames, Databricks recommends using the SQL interface. See Develop pipeline code with SQL.
For a full reference of Delta Live Tables Python syntax, see Delta Live Tables Python language reference.
Basics of Python for pipeline development
Python code that creates Delta Live Tables datasets must return DataFrames.
All Delta Live Tables Python APIs are implemented in the dlt
module. Your Delta Live Tables pipeline code implemented with Python must explicitly import the dlt
module at the top of Python notebooks and files.
Delta Live Tables-specific Python code differs from other types of Python code in one critical way: Python pipeline code does not directly call the functions that perform data ingestion and transformation to create Delta Live Tables datasets. Instead, Delta Live Tables interprets the decorator functions from the dlt
module in all source code files configured in a pipeline and builds a dataflow graph.
Important
To avoid unexpected behavior when your pipeline runs, do not include code that might have side effects in your functions that define datasets. To learn more, see the Python reference.
Create a materialized view or streaming table with Python
The @dlt.table
decorator tells Delta Live Tables to create a materialized view or streaming table based on the results returned by a function. The results of a batch read create a materialized view, while the results of a streaming read create a streaming table.
By default, materialized view and streaming table names are inferred from function names. The following code example shows the basic syntax for creating a materialized view and streaming table:
Note
Both functions reference the same table in the samples
catalog and use the same decorator function. These examples highlight that the only difference in the basic syntax for materialized views and streaming tables is using spark.read
versus spark.readStream
.
Not all data sources support streaming reads. Some data sources should always be processed with streaming semantics.
import dlt
@dlt.table()
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table()
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Optionally, you can specify the table name using the name
argument in the @dlt.table
decorator. The following example demonstrates this pattern for a materialized view and streaming table:
import dlt
@dlt.table(name = "trips_mv")
def basic_mv():
return spark.read.table("samples.nyctaxi.trips")
@dlt.table(name = "trips_st")
def basic_st():
return spark.readStream.table("samples.nyctaxi.trips")
Load data from object storage
Delta Live Tables supports loading data from all formats supported by Databricks. See Data format options.
Note
These examples use data available under the /databricks-datasets
automatically mounted to your workspace. Databricks recommends using volume paths or cloud URIs to reference data stored in cloud object storage. See What are Unity Catalog volumes?.
Databricks recommends using Auto Loader and streaming tables when configuring incremental ingestion workloads against data stored in cloud object storage. See What is Auto Loader?.
The following example creates a streaming table from JSON files using Auto Loader:
import dlt
@dlt.table()
def ingestion_st():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
The following example uses batch semantics to read a JSON directory and create a materialized view:
import dlt
@dlt.table()
def batch_mv():
return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")
Validate data with expectations
You can use expectations to set and enforce data quality constraints. See Manage data quality with Delta Live Tables.
The following code uses @dlt.expect_or_drop
to define an expectation named valid_data
that drops records that are null during data ingestion:
import dlt
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
Query materialized views and streaming tables defined in your pipeline
Use the LIVE
schema to query other materialized views and streaming tables defined in your pipeline.
The following example defines four datasets:
A streaming table named
orders
that loads JSON data.A materialized view named
customers
that loads CSV data.A materialized view named
customer_orders
that joins records from theorders
andcustomers
datasets, casts the order timestamp to a date, and selects thecustomer_id
,order_number
,state
, andorder_date
fields.A materialized view named
daily_orders_by_state
that aggregates the daily count of orders for each state.
import dlt
from pyspark.sql.functions import col
@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
return (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders")
)
@dlt.table()
def customers():
return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")
@dlt.table()
def customer_orders():
return (spark.read.table("LIVE.orders")
.join(spark.read.table("LIVE.customers"), "customer_id")
.select("customer_id",
"order_number",
"state",
col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
)
)
@dlt.table()
def daily_orders_by_state():
return (spark.read.table("LIVE.customer_orders")
.groupBy("state", "order_date")
.count().withColumnRenamed("count", "order_count")
)
Create tables in a for
loop
You can use Python for
loops to create multiple tables programmatically. This can be useful when you have many data sources or target datasets that vary by only a few parameters, resulting in less total code to maintain and less code redundancy.
The for
loop evaluates logic in serial order, but once planning is complete for the datasets, the pipeline runs logic in parallel.
Important
When using this pattern to define datasets, ensure that the list of values passed to the for
loop is always additive. If a dataset previously defined in a pipeline is omitted from a future pipeline run, that dataset is dropped automatically from the target schema.
The following example creates five tables that filter customer orders by region. Here, the region name is used to set the name of the target materialized views and to filter the source data. Temporary views are used to define joins from the source tables used in constructing the final materialized views.
import dlt
from pyspark.sql.functions import collect_list, col
@dlt.view()
def customer_orders():
orders = spark.read.table("samples.tpch.orders")
customer = spark.read.table("samples.tpch.customer")
return (orders.join(customer, orders.o_custkey == customer.c_custkey)
.select(
col("c_custkey").alias("custkey"),
col("c_name").alias("name"),
col("c_nationkey").alias("nationkey"),
col("c_phone").alias("phone"),
col("o_orderkey").alias("orderkey"),
col("o_orderstatus").alias("orderstatus"),
col("o_totalprice").alias("totalprice"),
col("o_orderdate").alias("orderdate"))
)
@dlt.view()
def nation_region():
nation = spark.read.table("samples.tpch.nation")
region = spark.read.table("samples.tpch.region")
return (nation.join(region, nation.n_regionkey == region.r_regionkey)
.select(
col("n_name").alias("nation"),
col("r_name").alias("region"),
col("n_nationkey").alias("nationkey")
)
)
# Extract region names from region table
region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]
# Iterate through region names to create new region-specific materialized views
for region in region_list:
@dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
def regional_customer_orders(region_filter=region):
customer_orders = spark.read.table("LIVE.customer_orders")
nation_region = spark.read.table("LIVE.nation_region")
return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
.select(
col("custkey"),
col("name"),
col("phone"),
col("nation"),
col("region"),
col("orderkey"),
col("orderstatus"),
col("totalprice"),
col("orderdate")
).filter(f"region = '{region_filter}'")
)
The following is an example of the data flow graph for this pipeline:
Troubleshooting: for
loop creates many tables with same values
The lazy execution model that pipelines use to evaluate Python code requires that your logic directly references individual values when the function decorated by @dlt.table()
is invoked.
The following example demonstrates two correct approaches to defining tables with a for
loop. In both examples, each table name from the tables
list is explicitly referenced within the function decorated by @dlt.table()
.
import dlt
# Create a parent function to set local variables
def create_table(table_name):
@dlt.table(name=table_name)
def t():
return spark.read.table(table_name)
tables = ["t1", "t2", "t3"]
for t_name in tables:
create_table(t_name)
# Call `@dlt.table()` within a for loop and pass values as variables
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table(table_name=t_name):
return spark.read.table(table_name)
The following example does not reference values correctly. This example creates tables with distinct names, but all tables load data from the last value in the for
loop:
import dlt
# Don't do this!
tables = ["t1", "t2", "t3"]
for t_name in tables:
@dlt.table(name=t_name)
def create_table():
return spark.read.table(t_name)