Delta Live Tables language reference

Preview

This feature is in Public Preview. To sign up for access, see Request Access to Delta Live Tables.

Python

Create table

To define a table in Python, apply the @table decorator. The @table decorator is an alias for the @create_table decorator.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Create view

To define a view in Python, apply the @view decorator. The @view decorator is an alias for the @create_view decorator.

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Python libraries

To specify external Python libraries, use the %pip install magic command. When an update starts, Delta Live Tables runs all cells containing a %pip install command before running any table definitions. Every Python notebook included in the pipeline has access to all installed libraries. The following example installs a package called logger and makes it globally available to any Python notebook in the pipeline:

%pip install logger

from logger import log_info

@dlt.table
def dataset():
    log_info(...)
    return dlt.read(..)

Python properties

@table or @view

name

Type: str

An optional name for the table or view. If not defined, the function name is used as the table or view name.

comment

Type: str

An optional description for the table.

spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.

table_properties

Type: dict

An optional list of table properties for the table.

path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

partition_cols

Type: array

An optional list of one or more columns to use for partitioning the table.

schema

Type: str or StructType

An optional schema definition for the table. Schemas can be defined as a SQL DDL string, or with a Python StructType.

Table or view definition

def <function-name>()

A Python function that defines the dataset. If the name parameter is not set, then <function-name> is used as the target dataset name.

query

A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame.

Use dlt.read() or spark.table() to perform a complete read from a dataset defined in the same pipeline. When using the spark.table() function to read from a dataset defined in the same pipeline, prepend the LIVE keyword to the dataset name in the function argument. For example, to read from a dataset named customers:

spark.table("LIVE.customers")

You can also use the spark.table() function to read from a table registered in the metastore by omitting the LIVE keyword and optionally qualifying the table name with the database name:

spark.table("sales.customers")

Use dlt.read_stream() to perform an incremental read from a dataset defined in the same pipeline.

Use the spark.sql function to define a SQL query to create the return dataset.

Use PySpark syntax to define Delta Live Tables queries with Python.

Expectations

@expect(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, include the row in the target dataset.

@expect_or_drop(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, drop the row from the target dataset.

@expect_or_fail(“description”, “constraint”)

Declare a data quality constraint identified by description. If a row violates the expectation, immediately stop execution.

@expect_all(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, include the row in the target dataset.

@expect_all_or_drop(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, drop the row from the target dataset.

@expect_all_or_fail(expectations)

Declare one or more data quality constraints. expectations is a Python dictionary, where the key is the expectation description and the value is the expectation constraint. If a row violates any of the expectations, immediately stop execution.

SQL

Create table

CREATE [TEMPORARY] [INCREMENTAL] LIVE TABLE table_name
  [(
    [
    col_name1 col_type1 [ COMMENT col_comment1 ],
    col_name2 col_type2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Create view

CREATE [INCREMENTAL] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL properties

CREATE TABLE or VIEW clause

TEMPORARY

Create a temporary table. No metadata is persisted for this table.

INCREMENTAL

Create a table that reads an input dataset as a stream.

PARTITIONED BY

An optional list of one or more columns to use for partitioning the table.

LOCATION

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

COMMENT

An optional description for the table.

TBLPROPERTIES

An optional list of table properties for the table.

select_statement

A Delta Live Tables query that defines the dataset for the table.

CONSTRAINT clause

EXPECT expectation_name

Define data quality constraint expectation_name. If ON VIOLATION constraint is not defined, add rows that violate the constraint to the target dataset.

ON VIOLATION

Optional action to take for failed rows:

  • FAIL UPDATE: Immediately stop pipeline execution.
  • DROP ROW: Drop the record and continue processing.

Table properties

In addition to the table properties supported by Delta Lake, you can set the following table properties.

Table properties

pipelines.autoOptimize.managed

Default: true

Enables or disables automatic scheduled optimization of this table.

pipelines.autoOptimize.zOrderCols

Default: None

An optional comma-separated list of column names to z-order this table by.

pipelines.reset.allowed

Default: true

Controls whether a full-refresh is allowed for this table.

pipelines.trigger.interval

Default: Based on flow type

Controls the trigger interval for a flow updating this table. Default values:

  • Four seconds for streaming queries.
  • One minute for complete queries when all input data is from Delta sources.
  • Ten minutes for complete queries when some data sources may be non-Delta. See Complete tables in continuous pipelines.

The value is a number plus the time unit. The following are the valid time units:

  • second, seconds
  • minute, minutes
  • hour, hours
  • day, days

You can use the singular or plural unit when defining the value, for example:

  • {"pipelines.trigger.interval" : "1 hour"}
  • {"pipelines.trigger.interval" : "10 seconds"}
  • {"pipelines.trigger.interval" : "30 second"}
  • {"pipelines.trigger.interval" : "1 minute"}
  • {"pipelines.trigger.interval" : "10 minutes"}
  • {"pipelines.trigger.interval" : "10 minute"}