Skip to main content

CREATE STREAMING TABLE (DLT)

A streaming table is a table with support for streaming or incremental data processing. Streaming tables defined in a pipeline notebook are backed by a DLT pipeline. Each time an streaming table is refreshed, data added to the source tables is appended to the streaming table. You can refresh streaming tables manually or on a schedule.

To learn more about how to perform or schedule refreshes, see Run an update on a DLT pipeline.

Syntax

CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]

table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )

column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]

table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]

Parameters

  • REFRESH

    If specified, will create the table, or update an existing table and its content.

  • PRIVATE

    Creates a private streaming table.

    • They are not added to the catalog and are only accessible within the defining pipeline
    • They can have the same name as an existing object in the catalog. Within the pipeline, if a private streaming table and an object in the catalog have the same name, references to the name will resolve to the private streaming table.
    • Private streaming tables are only persisted across the lifetime of the pipeline, not just a single update.

    Private streaming tables were previously created with the TEMPORARY parameter.

  • table_name

    The name of the newly created table. The fully qualified table name must be unique.

  • table_specification

    This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.

  • table_constraint

    Preview

    This feature is in Public Preview.

    When specifying a schema, you can define primary and foreign keys. The constraints are informational and are not enforced. See the CONSTRAINT clause in the SQL language reference.

    note

    To define table constraints, your pipeline must be a Unity Catalog-enabled pipeline.

  • table_clauses

    Optionally specify partitioning, comments, and user defined properties for the table. Each sub clause may only be specified once.

    • USING DELTA

      Specifies the data format. The only option is DELTA.

      This clause is optional, and defaults to DELTA.

    • PARTITIONED BY

      An optional list of one or more columns to use for partitioning in the table. Mutually exclusive with CLUSTER BY.

      Liquid clustering provides a flexible, optimized solution for clustering. Consider using CLUSTER BY instead of PARTITIONED BY for DLT.

    • CLUSTER BY

      Enable liquid clustering on the table and define the columns to use as clustering keys. Mutually exclusive with PARTITIONED BY.

      See Use liquid clustering for Delta tables.

    • LOCATION

      An optional storage location for table data. If not set, the system will default to the pipeline storage location.

    • COMMENT

      An optional STRING literal to describe the table.

    • TBLPROPERTIES

      An optional list of table properties for the table.

    • WITH ROW FILTER

    Preview

    This feature is in Public Preview.

    Adds a row filter function to the table. Future queries for that table receive a subset of the rows for which the function evaluates to TRUE. This is useful for fine-grained access control, because it allows the function to inspect the identity and group memberships of the invoking user to decide whether to filter certain rows.

    See ROW FILTER clause.

  • query

    This clause populates the table using the data from query. This query must be a streaming query. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and the SkipChangeCommits option to handle errors.

    When you specify a query and a table_specification together, the table schema specified in table_specification must contain all the columns returned by the query, otherwise you get an error. Any columns specified in table_specification but not returned by query return null values when queried.

    For more information on streaming data, see Transform data with pipelines.

Required permissions

The run-as user for a pipeline must have the following permissions:

  • SELECT privilege over the base tables referenced by the streaming table.
  • USE CATALOG privilege on the parent catalog and the USE SCHEMA privilege on the parent schema.
  • CREATE MATERIALIZED VIEW privilege on the schema for the streaming table.

For a user to be able to update the pipeline the streaming table is defined within, they require:

  • USE CATALOG privilege on the parent catalog and the USE SCHEMA privilege on the parent schema.
  • Ownership of the streaming table or REFRESH privilege on the streaming table.
  • The owner of the streaming table must have the SELECT privilege over the base tables referenced by the streaming table.

For a user to be able to query the resulting streaming table, they require:

  • USE CATALOG privilege on the parent catalog and the USE SCHEMA privilege on the parent schema.
  • SELECT privilege over the streaming table.

Limitations

  • Only table owners can refresh streaming tables to get the latest data.
  • ALTER TABLE commands are disallowed on streaming tables. The definition and properties of the table should be altered through the CREATE OR REFRESH or ALTER STREAMING TABLE statement.
  • Evolving the table schema through DML commands like INSERT INTO, and MERGE is not supported.
  • The following commands are not supported on streaming tables:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Delta Sharing is not supported.
  • Renaming the table or changing the owner is not supported.
  • Generated columns, identity columns, and default columns are not supported.

Examples

SQL
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")

-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)

-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;