Delta Live Tables SQL language reference

This article provides details for the Delta Live Tables SQL programming interface.

You can use Python user-defined functions (UDFs) in your SQL queries, but you must define these UDFs in Python files before calling them in SQL source files. See User-defined scalar functions - Python.

Limitations

The PIVOT clause is not supported. The pivot operation in Spark requires eager loading of input data to compute the schema of the output. This capability is not supported in Delta Live Tables.

Create a Delta Live Tables materialized view or streaming table

You use the same basic SQL syntax when declaring either a streaming table or a materialized view (also referred to as a LIVE TABLE).

You can only declare streaming tables using queries that read against a streaming source. Databricks recommends using Auto Loader for streaming ingestion of files from cloud object storage. See Auto Loader SQL syntax.

You must include the STREAM() function around a dataset name when specifying other tables or views in your pipeline as a streaming source.

The following describes the syntax for declaring materialized views and streaming tables with SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Create a Delta Live Tables view

The following describes the syntax for declaring views with SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Auto Loader SQL syntax

The following describes the syntax for working with Auto Loader in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

You can use supported format options with Auto Loader. Using the map() function, you can pass any number of options to the cloud_files() method. Options are key-value pairs, where the keys and values are strings. For details on support formats and options, see File format options.

Example: Define tables

You can create a dataset by reading from an external data source or from datasets defined in a pipeline. To read from an internal dataset, prepend the LIVE keyword to the dataset name. The following example defines two different datasets: a table called taxi_raw that takes a JSON file as the input source and a table called filtered_data that takes the taxi_raw table as input:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Example: Read from a streaming source

To read data from a streaming source, for example, Auto Loader or an internal data set, define a STREAMING table:

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

For more information on streaming data, see Transform data with Delta Live Tables.

Control how tables are materialized

Tables also offer additional control of their materialization:

Note

For tables less than 1 TB in size, Databricks recommends letting Delta Live Tables control data organization. Unless you expect your table to grow beyond a terabyte, you should generally not specify partition columns.

Example: Specify a schema and partition columns

You can optionally specify a schema when you define a table. The following example specifies the schema for the target table, including using Delta Lake generated columns and defining partition columns for the table:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

By default, Delta Live Tables infers the schema from the table definition if you don’t specify a schema.

Example: Define table constraints

Note

Table constraint support is in Public Preview. To define table constraints, your pipeline must be a Unity Catalog-enabled pipeline and configured to use the preview channel.

When specifying a schema, you can define primary key and foreign keys. The constraints are informational and are not enforced. The following example defines a table with a primary and foreign key constraint:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Set configuration values for a table or view

Use SET to specify a configuration value for a table or view, including Spark configurations. Any table or view you define in a notebook after the SET statement has access to the defined value. Any Spark configurations specified using the SET statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}. The following example sets a Spark configuration value named startDate and uses that value in a query:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

To specify multiple configuration values, use a separate SET statement for each value.

SQL properties

CREATE TABLE or VIEW

TEMPORARY

Create a table but do not publish metadata for the table. The TEMPORARY clause instructs Delta Live Tables to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a temporary table persists for the lifetime of the pipeline that creates it, and not just a single update.

STREAMING

Create a table that reads an input dataset as a stream. The input dataset must be a streaming data source, for example, Auto Loader or a STREAMING table.

PARTITIONED BY

An optional list of one or more columns to use for partitioning the table.

LOCATION

An optional storage location for table data. If not set, the system will default to the pipeline storage location.

COMMENT

An optional description for the table.

column_constraint

An optional informational primary key or foreign key constraint on the column.

table_constraint

An optional informational primary key or foreign key constraint on the table.

TBLPROPERTIES

An optional list of table properties for the table.

select_statement

A Delta Live Tables query that defines the dataset for the table.

CONSTRAINT clause

EXPECT expectation_name

Define data quality constraint expectation_name. If ON VIOLATION constraint is not defined, add rows that violate the constraint to the target dataset.

ON VIOLATION

Optional action to take for failed rows:

  • FAIL UPDATE: Immediately stop pipeline execution.

  • DROP ROW: Drop the record and continue processing.

Change data capture with SQL in Delta Live Tables

Use the APPLY CHANGES INTO statement to use Delta Live Tables CDC functionality, as described in the following:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

You define data quality constraints for an APPLY CHANGES target using the same CONSTRAINT clause as non-APPLY CHANGES queries. See Manage data quality with Delta Live Tables.

Note

The default behavior for INSERT and UPDATE events is to upsert CDC events from the source: update any rows in the target table that match the specified key(s) or insert a new row when a matching record does not exist in the target table. Handling for DELETE events can be specified with the APPLY AS DELETE WHEN condition.

Important

You must declare a target streaming table to apply changes into. You can optionally specify the schema for your target table. When specifying the schema of the APPLY CHANGES target table, you must also include the __START_AT and __END_AT columns with the same data type as the sequence_by field.

See Simplified change data capture with the APPLY CHANGES API in Delta Live Tables.

Clauses

KEYS

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

This clause is required.

IGNORE NULL UPDATES

Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and IGNORE NULL UPDATES is specified, columns with a null will retain their existing values in the target. This also applies to nested columns with a value of null.

This clause is optional.

The default is to overwrite existing columns with null values.

APPLY AS DELETE WHEN

Specifies when a CDC event should be treated as a DELETE rather than an upsert. To handle out-of-order data, the deleted row is temporarily retained as a tombstone in the underlying Delta table, and a view is created in the metastore that filters out these tombstones. The retention interval can be configured with the pipelines.cdc.tombstoneGCThresholdInSeconds table property.

This clause is optional.

APPLY AS TRUNCATE WHEN

Specifies when a CDC event should be treated as a full table TRUNCATE. Because this clause triggers a full truncate of the target table, it should be used only for specific use cases requiring this functionality.

The APPLY AS TRUNCATE WHEN clause is supported only for SCD type 1. SCD type 2 does not support truncate.

This clause is optional.

SEQUENCE BY

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

This clause is required.

COLUMNS

Specifies a subset of columns to include in the target table. You can either:

  • Specify the complete list of columns to include: COLUMNS (userId, name, city).

  • Specify a list of columns to exclude: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional.

The default is to include all columns in the target table when the COLUMNS clause is not specified.

STORED AS

Whether to store records as SCD type 1 or SCD type 2.

This clause is optional.

The default is SCD type 1.

TRACK HISTORY ON

Specifies a subset of output columns to generate history records when there are any changes to those specified columns. You can either:

  • Specify the complete list of columns to track: COLUMNS (userId, name, city).

  • Specify a list of columns to be excluded from tracking: COLUMNS * EXCEPT (operation, sequenceNum)

This clause is optional. The default is track history for all the output columns when there are any changes, equivalent to TRACK HISTORY ON *.