Use streaming tables in Databricks SQL
This feature is in Public Preview.
Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A DLT pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.
To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.
Requirements
To use streaming tables, you must meet the following requirements.
Workspace requirements:
Streaming tables created in Databricks SQL are backed by a serverless DLT pipeline. Your workspace must support serverless pipelines to use this functionality.
- A Databricks account with serverless enabled. For more information, see Enable serverless SQL warehouses.
- A workspace with Unity Catalog enabled. For more information, see Set up and manage Unity Catalog.
Compute requirements:
You must use one of the following:
- A SQL warehouse that uses the
Current
channel. - Compute with standard access mode (formerly shared access mode) on Databricks Runtime 13.3 LTS or above.
Permissions requirements:
USE CATALOG
andUSE SCHEMA
privileges on the catalog and schema in which you create the streaming table.- The
CREATE TABLE
privilege on the schema in which you create the streaming table. - Privileges for accessing the tables or locations providing the source data for your streaming table.
Create streaming tables
A streaming table is defined by a SQL query in Databricks SQL. When you create a streaming table, the data currently in the source tables is used to build the streaming table. After that, you refresh the table, usually on a schedule, to pull in any added data in the source tables to append to the streaming table.
When you create a streaming table, you are considered the owner of the table.
To create a streaming table from an existing table, use the CREATE STREAMING TABLE statement, as in the following example:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
In this case, the streaming table sales
is created from specific columns of the raw_data
table, with a schedule to refresh every hour. The query used must be a streaming query. Use the STREAM
keyword to use streaming semantics to read from the source.
When you create a streaming table using the CREATE OR REFRESH STREAMING TABLE
statement, the initial data refresh and population begin immediately. These operations do not consume DBSQL warehouse compute. Instead, streaming table rely on serverless DLT for both creation and refresh. A dedicated serverless DLT pipeline is automatically created and managed by the system for each streaming table.
Load files with Auto Loader
To create a streaming table from files in a volume, you use Auto Loader. Use Auto Loader with DLT for most data ingestion tasks from cloud object storage. Auto Loader and DLT are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage.
To use Auto Loader in Databricks SQL, use the read_files
function. The following examples shows using Auto Loader to read a volume of JSON files into a streaming table:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
To read data from cloud storage, you can also use Auto Loader:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'gs://mybucket/analysis/*/*/*.json',
format => "json"
);
To learn about Auto Loader, see What is Auto Loader?. To learn more about using Auto Loader in SQL, with examples, see Load data from object storage.
Streaming ingestion from other sources
For example of ingestion from other sources, including Kafka, see Load data with DLT.
Ingest new data only
By default, the read_files
function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles
option to false
. This means that only data that arrives in the directory after table creation is processed. For example:
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
Set the runtime channel
Streaming tables created using SQL warehouses are automatically refreshed using a DLT pipeline. DLT pipelines use the runtime in the current
channel by default. See DLT release notes and the release upgrade process to learn about the release process.
Databricks recommends using the current
channel for production workloads. New features are first released to the preview
channel. You can set a pipeline to the preview DLT channel to test new features by specifying preview
as a table property. You can specify this property when you create the table or after the table is created using an ALTER statement.
The following code example shows how to set the channel to preview in a CREATE statement:
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
Hide sensitive data
This feature is in Public Preview.
You can use streaming tables to hide sensitive data from users accessing the table. One approach is to define the query so that it excludes sensitive columns or rows entirely. Alternatively, you can apply column masks or row filters based on the permissions of the querying user. For example, you could hide the tax_id
column for users who are not in the group HumanResourcesDept
. To do this, use the ROW FILTER
and MASK
syntax during the creation of the streaming table. For more information, see Filter sensitive table data using row filters and column masks.
Refresh a streaming table
Refreshes can be scheduled automatically when you create the streaming table. You can also manually refresh streaming tables. Even if you have a scheduled refresh, you can call a manual refresh at any time. Refreshes are handled by the same pipeline that was automatically created along with the streaming table.
To refresh a streaming table:
REFRESH STREAMING TABLE sales;
You can check the status of the latest refresh with DESCRIBE TABLE EXTENDED.
Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed. You might need to refresh your streaming table before using time travel queries.
How refresh works
A streaming table refresh only evaluates new rows that have arrived since the last update, and appends only the new data.
Each refresh uses the current definition of the streaming table to process this new data. Modifying a streaming table definition does not automatically recalculate existing data. If a modification is incompatible with existing data (e.g., changing a data type), the next refresh will fail with an error.
The following examples explain how changes to a streaming table definition affect refresh behavior:
- Removing a filter will not reprocess previously filtered rows.
- Changing column projections won't affect how existing data was processed.
- Joins with static snapshots use the snapshot state at the time of the initial processing. Late-arriving data that would have matched with the updated snapshot will be ignored. This can lead to facts being dropped if dimensions are late.
- Modifying the CAST of an existing column will result in an error.
If your data changes in a way that cannot be supported in the existing streaming table, you can perform a full refresh.
Fully refresh a streaming table
Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.
For example:
REFRESH STREAMING TABLE sales FULL;
Change the schedule for a streaming table
You can modify (or set) an automatic refresh schedule for your streaming table. The following examples shows you how to set a schedule using ALTER STREAMING TABLE:
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;
For example refresh schedule queries, see ALTER STREAMING TABLE.
Track the status of a refresh
You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the DLT UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED
command for the streaming table.
DESCRIBE TABLE EXTENDED <table-name>;
Alternately, you can view the streaming table in Catalog Explorer and see the refresh status there:
- Click
Catalog in the sidebar.
- In the Catalog Explorer tree at the left, open the catalog and select the schema where your streaming table is located.
- Open the Tables item under the schema you selected, and click the streaming table.
From here, you can use the tabs under the streaming table name to view and edit information about the streaming table, including:
- Refresh status and history
- The table schema
- Sample data (requires an active compute)
- Permissions
- Lineage, including tables and pipelines that this streaming table depends on
- Insights into usage
- Monitors that you have created for this streaming table
Control access to streaming tables
Streaming tables support rich access controls to support data-sharing while avoiding exposing potentially private data. A streaming table owner or a user with the MANAGE
privilege can grant SELECT
privileges to other users. Users with SELECT
access to the streaming table do not need SELECT
access to the tables referenced by the streaming table. This access control enables data sharing while controlling access to the underlying data.
Grant privileges to a streaming table
To grant access to a streaming table, use the GRANT statement:
GRANT <privilege_type> ON <st_name> TO <principal>;
The privilege_type
can be:
SELECT
- the user canSELECT
the streaming table.REFRESH
- the user canREFRESH
the streaming table. Refreshes are run using the owner’s permissions.
The following example creates a streaming table and grants select and refresh privileges to users:
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.
Revoke privileges from a streaming table
To revoke access from a streaming table, use the REVOKE statement:
REVOKE privilege_type ON <st_name> FROM principal;
When SELECT
privileges on a source table are revoked from the streaming table owner or any other user who has been granted MANAGE
or SELECT
privileges on the streaming table, or the source table is dropped, the streaming table owner or user granted access is still able to query the streaming table. However, the following behavior occurs:
- The streaming table owner or others who have lost access to a streaming table can no longer
REFRESH
that streaming table, and the streaming table will become stale. - If automated with a schedule, the next scheduled
REFRESH
fails or is not run.
The following example revokes the SELECT
privilege from read_only_user
:
REVOKE SELECT ON st_name FROM read_only_user;
Permanently delete records from a streaming table
Support for the REORG
statement with streaming tables is in Public Preview.
- Using a
REORG
statement with a streaming table requires Databricks Runtime 15.4 and above. - Although you can use the
REORG
statement with any streaming table, it’s only required when deleting records from a streaming table with deletion vectors enabled. The command has no effect when used with a streaming table without deletion vectors enabled.
To physically delete records from the underlying storage for a streaming table with deletion vectors enabled, such as for GDPR compliance, additional steps must be taken to ensure that a VACUUM operation runs on the streaming table‘s data.
To physically delete records from underlying storage:
- Update records or delete records from the streaming table.
- Run a
REORG
statement against the streaming table, specifying theAPPLY (PURGE)
parameter. For exampleREORG TABLE <streaming-table-name> APPLY (PURGE);
. - Wait for the streaming table‘s data retention period to pass. The default data retention period is seven days, but it can be configured with the
delta.deletedFileRetentionDuration
table property. See Configure data retention for time travel queries. REFRESH
the streaming table. See Refresh a streaming table. Within 24 hours of theREFRESH
operation, DLT maintenance tasks, including theVACUUM
operation which is required to ensure records are permanently deleted, are run automatically.
Monitor runs using query history
You can use the query history page to access query details and query profiles that can help you identify poorly performing queries and bottlenecks in the DLT pipeline used to run your streaming table updates. For an overview of the kind of information available in query histories and query profiles, see Query history and Query profile.
This feature is in Public Preview. Workspace admins can enable this feature from the Previews page. See Manage Databricks Previews.
All statements related to streaming tables appear in the query history. You can use the Statement drop-down filter to select any command and inspect the related queries. All CREATE
statements are followed by a REFRESH
statement that executes asynchronously on a DLT pipeline. The REFRESH
statements typically include detailed query plans that provide insights into optimizing performance.
To access REFRESH
statements in the query history UI, use the following steps:
- Click
in the left sidebar to open the Query History UI.
- Select the REFRESH checkbox from the Statement drop-down filter.
- Click the name of the query statement to view summary details like the duration of the query and aggregated metrics.
- Click See query profile to open the query profile. See Query profile for details about navigating the query profile.
- Optionally, you can use the links in the Query Source section to open the related query or pipeline.
You can also access query details using links in the SQL editor or from a notebook attached to a SQL warehouse.