Delta Live Tables language reference

Preview

This feature is in Public Preview. Contact your Databricks representative to request access.

Python

Create table

import dlt

@dlt.create_table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"])
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
def <function-name>():
    return (<query>)

Create view

import dlt

@dlt.create_view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
def <function-name>():
    return (<query>)

Notebook-scoped Python libraries

Use the %pip install magic command to specify notebook-scoped Python libraries. Libraries installed in a notebook are available to that notebook only. The following example will install a package called logger and make it available to notebook commands:

%pip install logger

from logger import log_info

@dlt.create_table
def dataset():
    log_info(...)
    return dlt.read(..)

Python properties

@create_table or @create_view

name

Type: str

An optional name for the table or view. If not defined, the function name is used as the table or view name.

comment

Type: str

An optional description for the table.

spark_conf

Type: dict

An optional list of Spark configurations for the execution of this query.

table_properties

Type: dict

An optional list of table properties for the table.

path

Type: str

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

partition_cols

Type: array

An optional list of one or more columns to use for partitioning the table.

Table or view definition

def <function-name>()

A Python function that defines the dataset. If the name parameter is not set, then <function-name> is used as the target dataset name.

query

A Spark SQL statement that returns a Spark Dataset or Koalas DataFrame.

Use dlt.read() to perform a complete read from a dataset defined in a Delta Live Tables pipeline.

Use dlt.read_stream() to perform an incremental read from a dataset defined in a Delta Live Tables pipeline.

Expectations

@expect(“description”, “constraint”)

Define a data quality constraint defined by "constraint". If a row fails validation, include the row in the target dataset and use the "description" value to create a log entry describing the failure.

@expect_or_drop(“description”, “constraint”)

Define a data quality constraint defined by "constraint". If a row fails validation, drop the row from the target dataset and use the "description" value to create a log entry describing the failure.

@expect_or_fail(“description”, “constraint”)

Define a data quality constraint defined by "constraint". If a row fails validation, immediately stop execution and use the "description" value to create a log entry describing the failure.

SQL

Create table

CREATE [TEMPORARY] [INCREMENTAL] LIVE TABLE table_name
  [(
    [
    col_name1 col_type1 [ COMMENT col_comment1 ],
    col_name2 col_type2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Create view

CREATE [INCREMENTAL] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

SQL properties

CREATE TABLE or VIEW clause

TEMPORARY

Create a temporary table. No metadata is persisted for this table.

INCREMENTAL

Create a table that reads an input dataset as a stream.

PARTITIONED BY

An optional list of one or more columns to use for partitioning the table.

LOCATION

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

COMMENT

An optional description for the table.

TBLPROPERTIES

An optional list of table properties for the table.

select_statement

A Delta Live Tables query that defines the dataset for the table.

CONSTRAINT clause

EXPECT expectation_name

Define data quality constraint expectation_name. If ON VIOLATION constraint is not defined, rows that fail are logged and included in the target dataset.

ON VIOLATION

Optional action to take for failed rows:

  • FAIL UPDATE: Immediately stop pipeline execution.
  • DROP ROW: Drop the record, record the validation failure, and continue processing.

Table properties

In addition to the table properties supported by Delta Lake, you can set the following table properties.

Table properties

pipelines.autoOptimize.managed

Default: true

Enables or disables automatic scheduled optimization of this table.

pipelines.autoOptimize.zOrderCols

Default: None

An optional comma-separated list of column names to z-order this table by.

pipelines.reset.allowed

Default: true

Controls whether a full-refresh is allowed for this table.

pipelines.trigger.interval

Default: Based on flow type

The minimum interval for updating this pipeline.