CREATE STREAMING TABLE (DLT)
A streaming table is a table with support for streaming or incremental data processing. Streaming tables defined in a pipeline notebook are backed by a DLT pipeline. Each time an streaming table is refreshed, data added to the source tables is appended to the streaming table. You can refresh streaming tables manually or on a schedule.
To learn more about how to perform or schedule refreshes, see Run an update on a DLT pipeline.
Syntax
CREATE [OR REFRESH] [PRIVATE] STREAMING TABLE
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ column_constraint ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL | COMMENT column_comment | column_constraint | MASK clause } [ ... ]
table_clauses
{ USING DELTA
PARTITIONED BY (col [, ...]) |
CLUSTER BY clause |
LOCATION path |
COMMENT view_comment |
TBLPROPERTIES clause |
WITH { ROW FILTER clause } } [ ... ]
Parameters
-
REFRESH
If specified, will create the table, or update an existing table and its content.
-
PRIVATE
Creates a private streaming table.
- They are not added to the catalog and are only accessible within the defining pipeline
- They can have the same name as an existing object in the catalog. Within the pipeline, if a private streaming table and an object in the catalog have the same name, references to the name will resolve to the private streaming table.
- Private streaming tables are only persisted across the lifetime of the pipeline, not just a single update.
Private streaming tables were previously created with the
TEMPORARY
parameter. -
table_name
The name of the newly created table. The fully qualified table name must be unique.
-
table_specification
This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.
-
The column names must be unique and map to the output columns of the query.
-
Specifies the column’s data type. Not all data types supported by Databricks are supported by streaming tables.
-
column_comment
An optional
STRING
literal describing the column. This option must be specified along withcolumn_type
. If the column type is not specified, the column comment is skipped. -
Adds a constraint that validates data as it flows into the table. See Manage data quality with pipeline expectations.
-
Preview
This feature is in Public Preview.
Adds a column mask function to anonymize sensitive data.
See Filter sensitive table data using row filters and column masks.
-
-
table_constraint
PreviewThis feature is in Public Preview.
When specifying a schema, you can define primary and foreign keys. The constraints are informational and are not enforced. See the CONSTRAINT clause in the SQL language reference.
noteTo define table constraints, your pipeline must be a Unity Catalog-enabled pipeline.
-
table_clauses
Optionally specify partitioning, comments, and user defined properties for the table. Each sub clause may only be specified once.
-
USING DELTA
Specifies the data format. The only option is DELTA.
This clause is optional, and defaults to DELTA.
-
PARTITIONED BY
An optional list of one or more columns to use for partitioning in the table. Mutually exclusive with
CLUSTER BY
.Liquid clustering provides a flexible, optimized solution for clustering. Consider using
CLUSTER BY
instead ofPARTITIONED BY
for DLT. -
CLUSTER BY
Enable liquid clustering on the table and define the columns to use as clustering keys. Mutually exclusive with
PARTITIONED BY
. -
LOCATION
An optional storage location for table data. If not set, the system will default to the pipeline storage location.
-
COMMENT
An optional
STRING
literal to describe the table. -
TBLPROPERTIES
An optional list of table properties for the table.
-
WITH ROW FILTER
PreviewThis feature is in Public Preview.
Adds a row filter function to the table. Future queries for that table receive a subset of the rows for which the function evaluates to TRUE. This is useful for fine-grained access control, because it allows the function to inspect the identity and group memberships of the invoking user to decide whether to filter certain rows.
See
ROW FILTER
clause. -
-
This clause populates the table using the data from
query
. This query must be a streaming query. Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and theSkipChangeCommits
option to handle errors.When you specify a
query
and atable_specification
together, the table schema specified intable_specification
must contain all the columns returned by thequery
, otherwise you get an error. Any columns specified intable_specification
but not returned byquery
returnnull
values when queried.For more information on streaming data, see Transform data with pipelines.
Required permissions
The run-as user for a pipeline must have the following permissions:
SELECT
privilege over the base tables referenced by the streaming table.USE CATALOG
privilege on the parent catalog and theUSE SCHEMA
privilege on the parent schema.CREATE MATERIALIZED VIEW
privilege on the schema for the streaming table.
For a user to be able to update the pipeline the streaming table is defined within, they require:
USE CATALOG
privilege on the parent catalog and theUSE SCHEMA
privilege on the parent schema.- Ownership of the streaming table or
REFRESH
privilege on the streaming table. - The owner of the streaming table must have the
SELECT
privilege over the base tables referenced by the streaming table.
For a user to be able to query the resulting streaming table, they require:
USE CATALOG
privilege on the parent catalog and theUSE SCHEMA
privilege on the parent schema.SELECT
privilege over the streaming table.
Limitations
- Only table owners can refresh streaming tables to get the latest data.
ALTER TABLE
commands are disallowed on streaming tables. The definition and properties of the table should be altered through theCREATE OR REFRESH
or ALTER STREAMING TABLE statement.- Evolving the table schema through DML commands like
INSERT INTO
, andMERGE
is not supported. - The following commands are not supported on streaming tables:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
- Delta Sharing is not supported.
- Renaming the table or changing the owner is not supported.
- Generated columns, identity columns, and default columns are not supported.
Examples
-- Define a streaming table from a volume of files:
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/customers/*", format => "csv")
-- Define a streaming table from a streaming source table:
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
-- Define a table with a row filter and column mask:
CREATE OR REFRESH STREAMING TABLE customers_silver (
id int COMMENT 'This is the customer ID',
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(customers_bronze)
-- Define a streaming table that you can add flows into:
CREATE OR REFRESH STREAMING TABLE orders;