CREATE STREAMING TABLE
Applies to: Databricks SQL
Creates a streaming table, a Delta table with extra support for streaming or incremental data processing.
Streaming tables are only supported in Delta Live Tables and on Databricks SQL with Unity Catalog. Running this command on supported Databricks Runtime compute only parses the syntax. See Develop pipeline code with SQL.
Syntax
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( { column_identifier column_type [column_properties] } [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
column_properties
{ NOT NULL |
COMMENT column_comment |
column_constraint |
MASK clause } [ ... ]
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] schedule_clause |
WITH { ROW FILTER clause } } [...]
schedule_clause
{ EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
CRON cron_string [ AT TIME ZONE timezone_id ] }
Parameters
REFRESH
If specified, refreshes the table with the latest data available from the sources defined in the query. Only new data that arrives before the query starts is processed. New data that gets added to the sources during the execution of the command is ignored until the next refresh. The refresh operation from CREATE OR REFRESH is fully declarative. If a refresh command does not specify all metadata from the original table creation statement, the unspecified metadata is deleted.
IF NOT EXISTS
Creates the streaming table if it does not exist. If a table by this name already exists, the
CREATE STREAMING TABLE
statement is ignored.You may specify at most one of
IF NOT EXISTS
orOR REFRESH
.-
The name of the table to be created. The name must not include a temporal specification or options specification. If the name is not qualified the table is created in the current schema.
table_specification
This optional clause defines the list of columns, their types, properties, descriptions, and column constraints.
If you do not define columns in the table schema you must specify
AS query
.-
A unique name for the column.
-
Specifies the data type of the column.
NOT NULL
If specified the column does not accept
NULL
values.COMMENT column_comment
A string literal to describe the column.
-
Preview
This feature is in Public Preview.
Adds a primary key or foreign key constraint to the column in a streaming table. Constraints are not supported for tables in the
hive_metastore
catalog. -
Preview
This feature is in Public Preview.
Adds a column mask function to anonymize sensitive data. All subsequent queries from that column receive the result of evaluating that function over the column in place of the column’s original value. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to redact the value.
CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Adds data quality expectations to the table. These data quality expectations can be tracked over time and accessed through the streaming table’s event log. A
FAIL UPDATE
expectation causes the processing to fail when both creating the table as well as refreshing the table. ADROP ROW
expectation causes the entire row to be dropped if the expectation is not met.expectation_expr
may be composed of literals, column identifiers within the table, and deterministic, built-in SQL functions or operators except:-
Table valued generator functions
Also
expr
must not contain any subquery.-
-
Preview
This feature is in Public Preview.
Adds an informational primary key or informational foreign key constraints to a streaming table. Key constraints are not supported for tables in the
hive_metastore
catalog.
-
-
table_clauses
Optionally specify partitioning, comments, user defined properties, and a refresh schedule for the new table. Each sub clause may only be specified once.
-
An optional list of columns of the table to partition the table by.
COMMENT table_comment
A
STRING
literal to describe the table.-
Optionally sets one or more user defined properties.
Use this setting to specify the Delta Live Tables runtime channel used to run this statement. Set the value of the
pipelines.channel
property to"PREVIEW"
or"CURRENT"
. The default value is"CURRENT"
. For more information about Delta Live Tables channels, see Delta Live Tables runtime channels. SCHEDULE [ REFRESH ] schedule_clause
EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }
To schedule a refresh that occurs periodically, use
EVERY
syntax. IfEVERY
syntax is specified, the streaming table or materialized view is refreshed periodically at the specified interval based on the provided value, such asHOUR
,HOURS
,DAY
,DAYS
,WEEK
, orWEEKS
. The following table lists accepted integer values fornumber
.Time unit
Integer value
HOUR or HOURS
1 <= H <= 72
DAY or DAYS
1 <= D <= 31
WEEK or WEEKS
1 <= W <= 8
Note
The singular and plural forms of the included time unit are semantically equivalent.
CRON cron_string [ AT TIME ZONE timezone_id ]
To schedule a refresh using a quartz cron value. Valid time_zone_values are accepted.
AT TIME ZONE LOCAL
is not supported.If
AT TIME ZONE
is absent, the session time zone is used. IfAT TIME ZONE
is absent and the session time zone is not set, an error is thrown.SCHEDULE
is semantically equivalent toSCHEDULE REFRESH
.
The schedule can be provided as part of the
CREATE
command. Use ALTER STREAMING TABLE or runCREATE OR REFRESH
command withSCHEDULE
clause to alter the schedule of a streaming table after creation.-
WITH ROW FILTER clause
Preview
This feature is in Public Preview.
Adds a row filter function to the table. All subsequent queries from that table receive a subset of the rows where the function evaluates to boolean TRUE. This can be useful for fine-grained access control purposes where the function can inspect the identity or group memberships of the invoking user to decide whether to filter certain rows.
AS query
This clause populates the table using the data from
query
. This query must be a streaming query. This can be achieved by adding theSTREAM
keyword to any relation you want to process incrementally. When you specify aquery
and atable_specification
together, the table schema specified intable_specification
must contain all the columns returned by thequery
, otherwise you get an error. Any columns specified intable_specification
but not returned byquery
returnnull
values when queried.
Differences between streaming tables and other tables
Streaming tables are stateful tables, designed to handle each row only once as you process a growing dataset. Because most datasets grow continuously over time, streaming tables are good for most ingestion workloads. Streaming tables are optimal for pipelines that require data freshness and low latency. Streaming tables can also be useful for massive scale transformations, as results can be incrementally calculated as new data arrives, keeping results up to date without needing to fully recompute all source data with each update. Streaming tables are designed for data sources that are append-only.
Streaming tables accept additional commands such as REFRESH
, which processes the latest data available in the sources provided in the query. Changes to the provided query only get reflected on new data by calling a REFRESH
, not previously processed data. To apply the changes on existing data as well, you need to execute REFRESH TABLE <table_name> FULL
to perform a FULL REFRESH
. Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, as the full refresh truncates the existing data. You may not be able to recover old data if the data is no longer available in the source.
Row filters and column masks
Preview
This feature is in Public Preview.
Row filters let you specify a function that applies as a filter whenever a table scan fetches rows. These filters ensure that subsequent queries only return rows for which the filter predicate evaluates to true.
Column masks let you mask a column’s values whenever a table scan fetches rows. All future queries involving that column will receive the result of evaluating the function over the column, replacing the column’s original value.
For more information on how to use row filters and column masks, see Filter sensitive table data using row filters and column masks.
Managing Row Filters and Column Masks
Row filters and column masks on streaming tables should be added, updated, or dropped through the CREATE OR REFRESH
statement.
Behavior
Refresh as Definer: When the
CREATE OR REFRESH
orREFRESH
statements refresh a streaming table, row filter functions run with the definer’s rights (as the table owner). This means the table refresh uses the security context of the user who created the streaming table.Query: While most filters run with the definer’s rights, functions that check user context (such as
CURRENT_USER
andIS_MEMBER
) are exceptions. These functions run as the invoker. This approach enforces user-specific data security and access controls based on the current user’s context.
Limitations
Only table owners can refresh streaming tables to get the latest data.
ALTER TABLE
commands are disallowed on streaming tables. The definition and properties of the table should be altered through theCREATE OR REFRESH
or ALTER STREAMING TABLE statement.Evolving the table schema through DML commands like
INSERT INTO
, andMERGE
is not supported.The following commands are not supported on streaming tables:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Delta Sharing is not supported.
Renaming the table or changing the owner is not supported.
Table constraints such as
PRIMARY KEY
andFOREIGN KEY
are not supported.Generated columns, identity columns, and default columns are not supported.
Examples
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
TBLPROPERTIES(pipelines.channel = "PREVIEW")
AS SELECT * FROM RANGE(10)
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE EVERY 1 HOUR
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
id int,
name string,
region string,
ssn string MASK catalog.schema.ssn_mask_fn
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT *
FROM STREAM read_files('s3://bucket/path/sensitive_data')