Delta Live Tables Python language reference
This article provides details for the Delta Live Tables Python programming interface.
For information on the SQL API, see the Delta Live Tables SQL language reference.
For details specific to configuring Auto Loader, see What is Auto Loader?.
Limitations
The Delta Live Tables Python interface has the following limitations:
The Python
table
andview
functions must return a DataFrame. Some functions that operate on DataFrames do not return DataFrames and should not be used. Because DataFrame transformations are executed after the full dataflow graph has been resolved, using such operations might have unintended side effects. These operations include functions such ascollect()
,count()
,toPandas()
,save()
, andsaveAsTable()
. However, you can include these functions outside oftable
orview
function definitions because this code is run once during the graph initialization phase.The
pivot()
function is not supported. Thepivot
operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.
Import the dlt
Python module
Delta Live Tables Python functions are defined in the dlt
module. Your pipelines implemented with the Python API must import this module:
import dlt
Create a Delta Live Tables materialized view or streaming table
In Python, Delta Live Tables determines whether to update a dataset as a materialized view or streaming table based on the defining query. The @table
decorator is used to define both materialized views and streaming tables.
To define a materialized view in Python, apply @table
to a query that performs a static read against a data source. To define a streaming table, apply @table
to a query that performs a streaming read against a data source. Both dataset types have the same syntax specification as follows:
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Create a Delta Live Tables view
To define a view in Python, apply the @view
decorator. Like the @table
decorator, you can use views in Delta Live Tables for either static or streaming datasets. The following is the syntax for defining views with Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Python Delta Live Tables properties
The following tables describe the options and properties you can specify while defining tables and views with Delta Live Tables:
@table or @view |
---|
name Type: An optional name for the table or view. If not defined, the function name is used as the table or view name. |
comment Type: An optional description for the table. |
spark_conf Type: An optional list of Spark configurations for the execution of this query. |
table_properties Type: An optional list of table properties for the table. |
path Type: An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
partition_cols Type: An optional collection, for example, a |
schema Type: An optional schema definition for the table. Schemas can
be defined as a SQL DDL string, or with a Python
|
temporary Type: Create a temporary table. No metadata is persisted for this table. The default is ‘False’. |
Table or view definition |
---|
def <function-name>() A Python function that defines the dataset. If the |
query A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame. Use
You can also use the
Use Use the Use PySpark syntax to define Delta Live Tables queries with Python. |
Expectations |
---|
@expect(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_or_drop(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_or_fail(“description”, “constraint”) Declare a data quality constraint identified by
|
@expect_all(expectations) Declare one or more data quality constraints.
|
@expect_all_or_drop(expectations) Declare one or more data quality constraints.
|
@expect_all_or_fail(expectations) Declare one or more data quality constraints.
|
Change data capture with Python in Delta Live Tables
Preview
Delta Live Tables support for SCD type 2 is in Public Preview.
Use the apply_changes()
function in the Python API to use Delta Live Tables CDC functionality. The Delta Live Tables Python CDC interface also provides the create_streaming_live_table() function. You can use this function to create the target table required by the apply_changes()
function.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Note
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the APPLY AS DELETE WHEN
condition.
Important
You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the apply_changes
target table, you must also include the __START_AT
and __END_AT
columns with the same data type as the sequence_by
field.
See Change data capture with Delta Live Tables.
Arguments |
---|
target Type: The name of the table to be updated. You can use the create_streaming_live_table()
function to create the target table before executing the This parameter is required. |
source Type: The data source containing CDC records. This parameter is required. |
keys Type: The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. You can specify either:
Arguments to This parameter is required. |
sequence_by Type: The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. You can specify either:
Arguments to This parameter is required. |
ignore_null_updates Type: Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row
and This parameter is optional. The default is |
apply_as_deletes Type: Specifies when a CDC event should be treated as a You can specify either:
This parameter is optional. |
apply_as_truncates Type: Specifies when a CDC event should be treated as a full table The You can specify either:
This parameter is optional. |
column_list except_column_list Type: A subset of columns to include in the target table. Use
Arguments to This parameter is optional. The default is to include all columns in the target table when no |
stored_as_scd_type Type: Whether to store records as SCD type 1 or SCD type 2. Set to This clause is optional. The default is SCD type 1. |
track_history_column_list track_history_except_column_list Type: A subset of output columns to be tracked for history in the target table.
When Arguments to This parameter is optional. The default is to include all columns in the target table when no To use these parameters, you must set |
The default behavior for INSERT
and UPDATE
events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE
events can be specified with the apply_as_deletes
argument.
Create a target table for CDC output
Use the create_streaming_live_table()
function to create a target table for the apply_changes()
output records.
Note
The create_target_table()
function is deprecated. Databricks recommends updating existing code to use the create_streaming_live_table()
function.
create_streaming_live_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
path="<storage-location-path>",
schema="schema-definition"
)
Arguments |
---|
name Type: The table name. This parameter is required. |
comment Type: An optional description for the table. |
spark_conf Type: An optional list of Spark configurations for the execution of this query. |
table_properties Type: An optional list of table properties for the table. |
partition_cols Type: An optional list of one or more columns to use for partitioning the table. |
path Type: An optional storage location for table data. If not set, the system will default to the pipeline storage location. |
schema Type: An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python
|