Delta Live Tables Python language reference
This article provides details and examples for the Delta Live Tables Python programming interface. For the complete API specification, see the Python API specification.
For information on the SQL API, see the Delta Live Tables SQL language reference.
Limitations
The Delta Live Tables Python interface has the following limitations:
The Python
table
andview
functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such ascollect()
,count()
,toPandas()
,save()
, andsaveAsTable()
. However, you can include these functions outside oftable
orview
function definitions because this code is run once during the graph initialization phase.The
pivot()
function is not supported. Thepivot
operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.
Python datasets
The Python API is defined in the dlt
module. You must import the dlt
module in your Delta Live Tables pipelines implemented with the Python API. Apply the @dlt.view
or @dlt.table
decorator to a function to define a view or table in Python. You can use the function name or the name
parameter to assign the table or view name. The following example defines two different datasets: a view called taxi_raw
that takes a JSON file as the input source and a table called filtered_data
that takes the taxi_raw
view as input:
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
View and table functions must return a Spark DataFrame or a Koalas DataFrame. A Koalas DataFrame returned by a function is converted to a Spark Dataset by the Delta Live Tables runtime.
In addition to reading from external data sources, you can access datasets defined in the same pipeline with the Delta Live Tables read()
function. The following example demonstrate creating a customers_filtered
dataset using the read()
function:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
You can also use the spark.table()
function to access a dataset defined in the same pipeline or a table registered in the metastore. When using the spark.table()
function to access a dataset defined in the pipeline, in the function argument prepend the LIVE
keyword to the dataset name:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
To read data from a table registered in the metastore, in the function argument omit the LIVE
keyword and optionally qualify the table name with the database name:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Delta Live Tables ensures that the pipeline automatically captures the dependency between datasets. This dependency information is used to determine the execution order when performing an update and recording lineage information in the event log for a pipeline.
You can also return a dataset using a spark.sql
expression in a query function. To read from an internal dataset, prepend LIVE.
to the dataset name:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Both views and tables have the following optional properties:
comment
: A human-readable description of this dataset.spark_conf
: A Python dictionary containing Spark configurations for the execution of this query only.Data quality constraints enforced with expectations.
Tables also offer additional control of their materialization:
Specify how tables are partitioned using
partition_cols
. You can use partitioning to speed up queries.You can set table properties when you define a view or table. See Table properties for more details.
Set a storage location for table data using the
path
setting. By default, table data is stored in the pipeline storage location ifpath
isn’t set.You can optionally specify a table schema using a Python
StructType
or a SQL DDL string. When specified with a DDL string, the definition can include generated columns. The following examples create a table calledsales
with an explicitly specified schema:sales_schema = StructType([ StructField("customer_id", StringType(), True), StructField("customer_name", StringType(), True), StructField("number_of_line_items", StringType(), True), StructField("order_datetime", StringType(), True), StructField("order_number", LongType(), True)] ) @dlt.table( comment="Raw data on sales", schema=sales_schema) def sales(): return ("...") @dlt.table( comment="Raw data on sales", schema=""" customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG, order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)) """, partition_cols = ["order_day_of_week"]) def sales(): return ("...")
By default, Delta Live Tables infers the schema from the
table
definition if you don’t specify a schema.
Python libraries
To specify external Python libraries, use the %pip install
magic command. When an update starts, Delta Live Tables runs all cells containing a %pip install
command before running any table definitions. Every Python notebook included in the pipeline has access to all installed libraries. The following example installs the numpy
library and makes it globally available to any Python notebook in the pipeline:
%pip install numpy
import numpy as np
To install a Python wheel package, add the wheel path to the %pip install
command. Installed Python wheel packages are available to all tables in the pipeline. The following example installs a wheel named dltfns-1.0-py3-none-any.whl
from the DBFS directory /dbfs/dlt/
:
%pip install /dbfs/dlt/dltfns-1.0-py3-none-any.whl
Python API specification
Python module
Delta Live Tables Python functions are defined in the dlt
module. Your pipelines implemented with the Python API must import this module:
import dlt
Create table
To define a table in Python, apply the @table
decorator. The @table
decorator is an alias for the @create_table
decorator.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Create view
To define a view in Python, apply the @view
decorator. The @view
decorator is an alias for the @create_view
decorator.
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Python properties
@table or @view |
---|
name Type: An optional name for the table or view. If not defined, the function name is used as the table or view name. |
comment Type: An optional description for the table. |
spark_conf Type: An optional list of Spark configurations for the execution of this query. |
table_properties Type: An optional list of table properties for the table. |
path Type: An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
partition_cols Type: An optional collection, for example, a |
schema Type: An optional schema definition for the table. Schemas can
be defined as a SQL DDL string, or with a Python
|
temporary Type: Create a temporary table. No metadata is persisted for this table. The default is ‘False’. |
Table or view definition |
---|
def <function-name>() A Python function that defines the dataset. If the |
query A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame. Use
You can also use the
Use Use the Use PySpark syntax to define Delta Live Tables queries with Python. |
Expectations |
---|
@expect(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_or_drop(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_or_fail(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_all(expectations) Declare one or more data quality constraints.
|
@expect_all_or_drop(expectations) Declare one or more data quality constraints.
|
@expect_all_or_fail(expectations) Declare one or more data quality constraints.
|
Table properties
In addition to the table properties supported by Delta Lake, you can set the following table properties.
Table properties |
---|
pipelines.autoOptimize.managed Default: Enables or disables automatic scheduled optimization of this table. |
pipelines.autoOptimize.zOrderCols Default: None An optional string containing a comma-separated list of column names to z-order this table by.
For example, |
pipelines.reset.allowed Default: Controls whether a full-refresh is allowed for this table. |