Load data using streaming tables in Databricks SQL
Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A Delta Live Tables pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.
This article demonstrates using streaming tables to load data from cloud object storage configured as a Unity Catalog volume (recommended) or external location.
Note
To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.
Important
Streaming tables created in Databricks SQL are backed by a serverless Delta Live Tables pipeline. Your workspace must support serverless pipelines to use this functionality.
Before you begin
Before you begin, you must meet the following requirements.
Workspace requirements:
A Databricks account with serverless enabled. For more information, see Enable serverless SQL warehouses.
A workspace with Unity Catalog enabled. For more information, see Set up and manage Unity Catalog.
Compute requirements:
You must use one of the following:
A SQL warehouse that uses the
Current
channel.Compute with shared access mode on Databricks Runtime 13.3 LTS or above.
Compute with single user access mode on Databricks Runtime 15.4 LTS or above.
On Databricks Runtime 15.3 and below, you cannot use single user compute to query streaming tables that are owned by other users. You can use single user compute on Databricks Runtime 15.3 and below only if you own the streaming table. The creator of the table is the owner.
Databricks Runtime 15.4 LTS and above support queries on Delta Live Tables-generated tables on single user compute, regardless of table ownership. To take advantage of the data filtering provided in Databricks Runtime 15.4 LTS and above, you must confirm that your workspace is enabled for serverless compute because the data filtering functionality that supports Delta Live Tables-generated tables runs on serverless compute. You could be charged for serverless compute resources when you use single user compute to run data filtering operations. See Fine-grained access control on single user compute.
Permissions requirements:
The
READ FILES
privilege on a Unity Catalog external location. For information, see Create an external location to connect cloud storage to Databricks.The
USE CATALOG
privilege on the catalog in which you create the streaming table.The
USE SCHEMA
privilege on the schema in which you create the streaming table.The
CREATE TABLE
privilege on the schema in which you create the streaming table.
Other requirements:
The path to your source data.
Volume path example:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
External location path example:
s3://myBucket/analysis
Note
This article assumes the data you want to load is in a cloud storage location that corresponds to a Unity Catalog volume or external location that you have access to.
Discover and preview source data
In the sidebar of your workspace, click Queries, and then click Create query.
In the query editor, select a SQL warehouse that uses the
Current
channel from the drop-down list.Paste the following into the editor, substituting values in angle brackets (
<>
) for the information identifying your source data, and then click Run.Note
You might encounter schema inference errors when running the
read_files
table valued function if the defaults for the function can’t parse your data. For example, you might need to configure multi-line mode for multi-line CSV or JSON files. For a list of parser options, see read_files table-valued function./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "s3://<bucket>/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
Load data into a streaming table
To create a streaming table from data in cloud object storage, paste the following into the query editor, and then click Run:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')
Set the runtime channel
Streaming tables created using SQL warehouses are automatically refreshed using a Delta Live Tables pipeline. Delta Live Tables pipelines use the runtime in the current
channel by default. See Delta Live Tables release notes and the release upgrade process to learn about the release process.
Databricks recommends using the current
channel for production workloads. New features are first released to the preview
channel. You can set a pipeline to the preview Delta Live Tables channel to test new features by specifying preview
as a table property. You can specify this property when you create the table or after the table is created using an ALTER statement.
The following code example shows how to set the channel to preview in a CREATE statement:
CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
*
FROM
range(5)
Refresh a streaming table using a DLT pipeline
This section describes patterns for refreshing a streaming table with the latest data available from the sources defined in the query.
When you CREATE
or REFRESH
a streaming table, the update processes using a serverless Delta Live Tables pipeline. Each streaming table you define has an associated Delta Live Tables pipeline.
After you run the REFRESH
command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
Note
Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed. You might need to refresh your streaming table before using time travel queries.
See What is Delta Live Tables?.
Ingest new data only
By default, the read_files
function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles
option to false
. This means that only data that arrives in the directory after table creation is processed. For example:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)
Fully refresh a streaming table
Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.
For example:
REFRESH STREAMING TABLE my_bronze_table FULL
Schedule a streaming table for automatic refresh
To configure a streaming table to automatically refresh based on a defined schedule, paste the following into the query editor, and then click Run:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
For example refresh schedule queries, see ALTER STREAMING TABLE.
Track the status of a refresh
You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the Delta Live Tables UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED
command for the streaming table.
DESCRIBE EXTENDED <table-name>
Streaming ingestion from Kafka
For an example of streaming ingestion from Kafka, see read_kafka.
Grant users access to a streaming table
To grant users the SELECT
privilege on the streaming table so they can query it, paste the following into the query editor, and then click Run:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.
Monitor runs using query history
You can use the query history page to access query details and query profiles that can help you identify poorly performing queries and bottlenecks in the Delta Live Tables pipeline used to run your streaming table updates. For an overview of the kind of information available in query histories and query profiles, see Query history and Query profile.
Preview
This feature is in Public Preview. Workspace admins can enable this feature from the Previews page. See Manage Databricks Previews.
All statements related to streaming tables appear in the query history. You can use the Statement drop-down filter to select any command and inspect the related queries. All CREATE
statements are followed by a REFRESH
statement that executes asynchronously on a Delta Live Tables pipeline. The REFRESH
statements typically include detailed query plans that provide insights into optimizing performance.
To access REFRESH
statements in the query history UI, use the following steps:
Click in the left sidebar to open the Query History UI.
Select the REFRESH checkbox from the Statement drop-down filter.
Click the name of the query statement to view summary details like the duration of the query and aggregated metrics.
Click See query profile to open the query profile. See Query profile for details about navigating the query profile.
Optionally, you can use the links in the Query Source section to open the related query or pipeline.
You can also access query details using links in the SQL editor or from a notebook attached to a SQL warehouse.
Note
Your streaming table must be configured to run using the preview channel. See Set the runtime channel.