Load data using streaming tables in Databricks SQL
Preview
This feature is in Public Preview. To sign up for access, fill out this form.
Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a Unity Catalog managed table with extra support for streaming or incremental data processing. A DLT pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.
This article demonstrates using streaming tables to load data from cloud object storage configured as a Unity Catalog volume (recommended) or external location.
Note
To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.
Before you begin
Before you begin, make sure you have the following:
A Databricks account with serverless enabled. For more information, see Use serverless SQL warehouses.
A workspace with Unity Catalog enabled. For more information, see Get started using Unity Catalog.
A SQL warehouse that uses the
Current
channel.The
READ FILES
privilege on a Unity Catalog external location. For information, see Manage external locations and storage credentials.The
USE CATALOG
privilege on the catalog in which you create the streaming table.The
USE SCHEMA
privilege on the schema in which you create the streaming table.The
CREATE TABLE
privilege on the schema in which you create the streaming table.The path to your source data.
Volume path example:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
External location path example:
s3://myBucket/analysis
Note
This article assumes the data you want to load is in a cloud storage location that corresponds to a Unity Catalog volume or external location you have access to.
Discover and preview source data
In the sidebar of your workspace, click Queries, and then click Create query.
In the query editor, select a SQL warehouse that uses the
Current
channel from the drop-down list.Paste the following into the editor, substituting values in angle brackets (
<>
) for the information identifying your source data, and then click Run.Note
You might encounter schema inference errors when running the
read_files
table valued function if the defaults for the function can’t parse your data. For example, you might need to configure multi-line mode for multi-line CSV or JSON files. For a list of parser options, see read_files table-valued function./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "s3://<bucket>/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
Load data into a streaming table
To create a streaming table from data in cloud object storage, paste the following into the query editor, and then click Run:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')
Refresh a streaming table using a DLT pipeline
This section describes patterns for refreshing a streaming table with the latest data available from the sources defined in the query.
CREATE
operations for streaming tables use a Databricks SQL warehouse for the initial creation and loading of data into the streaming table. REFRESH
operations for streaming tables use Delta Live Tables (DLT). A DLT pipeline is automatically created for each streaming table. When a streaming table is refreshed, an update to the DLT pipeline is initiated to process the refresh.
After you run the REFRESH
command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
Note
Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed.
See What is Delta Live Tables?.
Ingest new data only
By default, the read_files
function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, use the ignoreExistingFiles
option. This means that only data that arrives in the directory after table creation is processed. For example:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', ignoreExistingFiles => false)
Fully refresh a streaming table
Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.
For example:
REFRESH STREAMING TABLE my_bronze_table FULL
Schedule a streaming table for automatic refresh
To configure a streaming table to automatically refresh based on a defined schedule, paste the following into the query editor, and then click Run:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
For example refresh schedule queries, see ALTER STREAMING TABLE.
Track the status of a refresh
You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the Delta Live Tables UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED
command for the streaming table.
DESCRIBE EXTENDED <table-name>
Streaming ingestion from Kafka
For an example of streaming ingestion from Kafka, see read_kafka.
Grant users access to a streaming table
To grant users the SELECT
privilege on the streaming table so they can query it, paste the following into the query editor, and then click Run:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.