Skip to main content

Use streaming tables in Databricks SQL

Preview

This feature is in Public Preview.

Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A DLT pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.

note

To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.

Requirements

To use streaming tables, you must meet the following requirements.

Workspace requirements:

Streaming tables created in Databricks SQL are backed by a serverless DLT pipeline. Your workspace must support serverless pipelines to use this functionality.

Compute requirements:

You must use one of the following:

  • A SQL warehouse that uses the Current channel.
  • Compute with standard access mode (formerly shared access mode) on Databricks Runtime 13.3 LTS or above.

Permissions requirements:

  • USE CATALOG and USE SCHEMA privileges on the catalog and schema in which you create the streaming table.
  • The CREATE TABLE privilege on the schema in which you create the streaming table.
  • Privileges for accessing the tables or locations providing the source data for your streaming table.

Create streaming tables

A streaming table is defined by a SQL query in Databricks SQL. When you create a streaming table, the data currently in the source tables is used to build the streaming table. After that, you refresh the table, usually on a schedule, to pull in any added data in the source tables to append to the streaming table.

When you create a streaming table, you are considered the owner of the table.

To create a streaming table from an existing table, use the CREATE STREAMING TABLE statement, as in the following example:

SQL
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;

In this case, the streaming table sales is created from specific columns of the raw_data table, with a schedule to refresh every hour. The query used must be a streaming query. Use the STREAM keyword to use streaming semantics to read from the source.

When you create a streaming table using the CREATE OR REFRESH STREAMING TABLE statement, the initial data refresh and population begin immediately. These operations do not consume DBSQL warehouse compute. Instead, streaming table rely on serverless DLT for both creation and refresh. A dedicated serverless DLT pipeline is automatically created and managed by the system for each streaming table.

Load files with Auto Loader

To create a streaming table from files in a volume, you use Auto Loader. Use Auto Loader with DLT for most data ingestion tasks from cloud object storage. Auto Loader and DLT are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage.

To use Auto Loader in Databricks SQL, use the read_files function. The following examples shows using Auto Loader to read a volume of JSON files into a streaming table:

SQL
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);

To read data from cloud storage, you can also use Auto Loader:

SQL
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'gs://mybucket/analysis/*/*/*.json',
format => "json"
);

To learn about Auto Loader, see What is Auto Loader?. To learn more about using Auto Loader in SQL, with examples, see Load data from object storage.

Streaming ingestion from other sources

For example of ingestion from other sources, including Kafka, see Load data with DLT.

Ingest new data only

By default, the read_files function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.

To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles option to false. This means that only data that arrives in the directory after table creation is processed. For example:

SQL
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);

Set the runtime channel

Streaming tables created using SQL warehouses are automatically refreshed using a DLT pipeline. DLT pipelines use the runtime in the current channel by default. See DLT release notes and the release upgrade process to learn about the release process.

Databricks recommends using the current channel for production workloads. New features are first released to the preview channel. You can set a pipeline to the preview DLT channel to test new features by specifying preview as a table property. You can specify this property when you create the table or after the table is created using an ALTER statement.

The following code example shows how to set the channel to preview in a CREATE statement:

SQL
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;

Hide sensitive data

Preview

This feature is in Public Preview.

You can use streaming tables to hide sensitive data from users accessing the table. One approach is to define the query so that it excludes sensitive columns or rows entirely. Alternatively, you can apply column masks or row filters based on the permissions of the querying user. For example, you could hide the tax_id column for users who are not in the group HumanResourcesDept. To do this, use the ROW FILTER and MASK syntax during the creation of the streaming table. For more information, see Filter sensitive table data using row filters and column masks.

Refresh a streaming table

Refreshes can be scheduled automatically when you create the streaming table. You can also manually refresh streaming tables. Even if you have a scheduled refresh, you can call a manual refresh at any time. Refreshes are handled by the same pipeline that was automatically created along with the streaming table.

To refresh a streaming table:

SQL
REFRESH STREAMING TABLE sales;

You can check the status of the latest refresh with DESCRIBE TABLE EXTENDED.

note

Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed. You might need to refresh your streaming table before using time travel queries.

How refresh works

A streaming table refresh only evaluates new rows that have arrived since the last update, and appends only the new data.

Each refresh uses the current definition of the streaming table to process this new data. Modifying a streaming table definition does not automatically recalculate existing data. If a modification is incompatible with existing data (e.g., changing a data type), the next refresh will fail with an error.

The following examples explain how changes to a streaming table definition affect refresh behavior:

  • Removing a filter will not reprocess previously filtered rows.
  • Changing column projections won't affect how existing data was processed.
  • Joins with static snapshots use the snapshot state at the time of the initial processing. Late-arriving data that would have matched with the updated snapshot will be ignored. This can lead to facts being dropped if dimensions are late.
  • Modifying the CAST of an existing column will result in an error.

If your data changes in a way that cannot be supported in the existing streaming table, you can perform a full refresh.

Fully refresh a streaming table

Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.

For example:

SQL
REFRESH STREAMING TABLE sales FULL;

Change the schedule for a streaming table

You can modify (or set) an automatic refresh schedule for your streaming table. The following examples shows you how to set a schedule using ALTER STREAMING TABLE:

SQL
ALTER STREAMING TABLE sales
ADD SCHEDULE every 1 hour;

For example refresh schedule queries, see ALTER STREAMING TABLE.

Track the status of a refresh

You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the DLT UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED command for the streaming table.

SQL
DESCRIBE TABLE EXTENDED <table-name>;

Alternately, you can view the streaming table in Catalog Explorer and see the refresh status there:

  1. Click Catalog icon Catalog in the sidebar.
  2. In the Catalog Explorer tree at the left, open the catalog and select the schema where your streaming table is located.
  3. Open the Tables item under the schema you selected, and click the streaming table.

From here, you can use the tabs under the streaming table name to view and edit information about the streaming table, including:

  • Refresh status and history
  • The table schema
  • Sample data (requires an active compute)
  • Permissions
  • Lineage, including tables and pipelines that this streaming table depends on
  • Insights into usage
  • Monitors that you have created for this streaming table

Control access to streaming tables

Streaming tables support rich access controls to support data-sharing while avoiding exposing potentially private data. A streaming table owner or a user with the MANAGE privilege can grant SELECT privileges to other users. Users with SELECT access to the streaming table do not need SELECT access to the tables referenced by the streaming table. This access control enables data sharing while controlling access to the underlying data.

Grant privileges to a streaming table

To grant access to a streaming table, use the GRANT statement:

SQL
GRANT <privilege_type> ON <st_name> TO <principal>;

The privilege_type can be:

  • SELECT - the user can SELECT the streaming table.
  • REFRESH - the user can REFRESH the streaming table. Refreshes are run using the owner’s permissions.

The following example creates a streaming table and grants select and refresh privileges to users:

SQL
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;

-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;

-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;

For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.

Revoke privileges from a streaming table

To revoke access from a streaming table, use the REVOKE statement:

SQL
REVOKE privilege_type ON <st_name> FROM principal;

When SELECT privileges on a source table are revoked from the streaming table owner or any other user who has been granted MANAGE or SELECT privileges on the streaming table, or the source table is dropped, the streaming table owner or user granted access is still able to query the streaming table. However, the following behavior occurs:

  • The streaming table owner or others who have lost access to a streaming table can no longer REFRESH that streaming table, and the streaming table will become stale.
  • If automated with a schedule, the next scheduled REFRESH fails or is not run.

The following example revokes the SELECT privilege from read_only_user:

SQL
REVOKE SELECT ON st_name FROM read_only_user;

Permanently delete records from a streaming table

Preview

Support for the REORG statement with streaming tables is in Public Preview.

note
  • Using a REORG statement with a streaming table requires Databricks Runtime 15.4 and above.
  • Although you can use the REORG statement with any streaming table, it’s only required when deleting records from a streaming table with deletion vectors enabled. The command has no effect when used with a streaming table without deletion vectors enabled.

To physically delete records from the underlying storage for a streaming table with deletion vectors enabled, such as for GDPR compliance, additional steps must be taken to ensure that a VACUUM operation runs on the streaming table‘s data.

To physically delete records from underlying storage:

  1. Update records or delete records from the streaming table.
  2. Run a REORG statement against the streaming table, specifying the APPLY (PURGE) parameter. For example REORG TABLE <streaming-table-name> APPLY (PURGE);.
  3. Wait for the streaming table‘s data retention period to pass. The default data retention period is seven days, but it can be configured with the delta.deletedFileRetentionDuration table property. See Configure data retention for time travel queries.
  4. REFRESH the streaming table. See Refresh a streaming table. Within 24 hours of the REFRESH operation, DLT maintenance tasks, including the VACUUM operation which is required to ensure records are permanently deleted, are run automatically.

Monitor runs using query history

You can use the query history page to access query details and query profiles that can help you identify poorly performing queries and bottlenecks in the DLT pipeline used to run your streaming table updates. For an overview of the kind of information available in query histories and query profiles, see Query history and Query profile.

Preview

This feature is in Public Preview. Workspace admins can enable this feature from the Previews page. See Manage Databricks Previews.

All statements related to streaming tables appear in the query history. You can use the Statement drop-down filter to select any command and inspect the related queries. All CREATE statements are followed by a REFRESH statement that executes asynchronously on a DLT pipeline. The REFRESH statements typically include detailed query plans that provide insights into optimizing performance.

To access REFRESH statements in the query history UI, use the following steps:

  1. Click History Icon in the left sidebar to open the Query History UI.
  2. Select the REFRESH checkbox from the Statement drop-down filter.
  3. Click the name of the query statement to view summary details like the duration of the query and aggregated metrics.
  4. Click See query profile to open the query profile. See Query profile for details about navigating the query profile.
  5. Optionally, you can use the links in the Query Source section to open the related query or pipeline.

You can also access query details using links in the SQL editor or from a notebook attached to a SQL warehouse.

Additional resources