Load data using streaming tables in Databricks SQL
Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A Delta Live Tables pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.
This article demonstrates using streaming tables to load data from cloud object storage configured as a Unity Catalog volume (recommended) or external location.
Note
To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.
Before you begin
Before you begin, you must meet the following requirements.
Workspace requirements:
A Databricks account with serverless enabled. For more information, see Enable serverless SQL warehouses.
A workspace with Unity Catalog enabled. For more information, see Set up and manage Unity Catalog.
Compute requirements:
You must use one of the following:
A SQL warehouse that uses the
Current
channel.Compute with shared access mode on Databricks Runtime 13.3 LTS or above.
Compute with single user access mode on Databricks Runtime 15.4 LTS or above.
On Databricks Runtime 15.3 and below, you cannot use single user compute to query streaming tables that are owned by other users. You can use single user compute on Databricks Runtime 15.3 and below only if you own the streaming table. The creator of the table is the owner.
Databricks Runtime 15.4 LTS and above support queries on Delta Live Tables-generated tables on single user compute, regardless of table ownership. To take advantage of the data filtering provided in Databricks Runtime 15.4 LTS and above, you must confirm that your workspace is enabled for serverless compute because the data filtering functionality that supports Delta Live Tables-generated tables runs on serverless compute. You could be charged for serverless compute resources when you use single user compute to run data filtering operations. See Fine-grained access control on single user compute.
Permissions requirements:
The
READ FILES
privilege on a Unity Catalog external location. For information, see Create an external location to connect cloud storage to Databricks.The
USE CATALOG
privilege on the catalog in which you create the streaming table.The
USE SCHEMA
privilege on the schema in which you create the streaming table.The
CREATE TABLE
privilege on the schema in which you create the streaming table.
Other requirements:
The path to your source data.
Volume path example:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
External location path example:
s3://myBucket/analysis
Note
This article assumes the data you want to load is in a cloud storage location that corresponds to a Unity Catalog volume or external location that you have access to.
Discover and preview source data
In the sidebar of your workspace, click Queries, and then click Create query.
In the query editor, select a SQL warehouse that uses the
Current
channel from the drop-down list.Paste the following into the editor, substituting values in angle brackets (
<>
) for the information identifying your source data, and then click Run.Note
You might encounter schema inference errors when running the
read_files
table valued function if the defaults for the function can’t parse your data. For example, you might need to configure multi-line mode for multi-line CSV or JSON files. For a list of parser options, see read_files table-valued function./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "s3://<bucket>/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
Load data into a streaming table
To create a streaming table from data in cloud object storage, paste the following into the query editor, and then click Run:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')
Refresh a streaming table using a DLT pipeline
This section describes patterns for refreshing a streaming table with the latest data available from the sources defined in the query.
When you CREATE
or REFRESH
a streaming table, the update processes using a serverless Delta Live Tables pipeline. Each streaming table you define has an associated Delta Live Tables pipeline.
After you run the REFRESH
command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.
Note
Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed.
See What is Delta Live Tables?.
Ingest new data only
By default, the read_files
function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.
To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles
option to false
. This means that only data that arrives in the directory after table creation is processed. For example:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)
Fully refresh a streaming table
Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.
For example:
REFRESH STREAMING TABLE my_bronze_table FULL
Schedule a streaming table for automatic refresh
To configure a streaming table to automatically refresh based on a defined schedule, paste the following into the query editor, and then click Run:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
For example refresh schedule queries, see ALTER STREAMING TABLE.
Track the status of a refresh
You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the Delta Live Tables UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED
command for the streaming table.
DESCRIBE EXTENDED <table-name>
Streaming ingestion from Kafka
For an example of streaming ingestion from Kafka, see read_kafka.
Grant users access to a streaming table
To grant users the SELECT
privilege on the streaming table so they can query it, paste the following into the query editor, and then click Run:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.