Load data using streaming tables in Databricks SQL

Preview

This feature is in Public Preview. To sign up for access, fill out this form.

Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a Unity Catalog managed table with extra support for streaming or incremental data processing. A DLT pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.

This article demonstrates using streaming tables to load data from cloud object storage configured as a Unity Catalog volume (recommended) or external location.

Note

To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.

Before you begin

Before you begin, make sure you have the following:

  • A Databricks account with serverless enabled. For more information, see Enable serverless SQL warehouses.

  • A workspace with Unity Catalog enabled. For more information, see Set up and manage Unity Catalog.

  • A SQL warehouse that uses the Current channel.

  • To query streaming tables created by a Delta Live Tables pipeline, you must use a shared compute using Databricks Runtime 13.3 LTS and above or a SQL warehouse. Streaming tables created in a Unity Catalog enabled pipeline cannot be queried from assigned or no isolation clusters.

  • The READ FILES privilege on a Unity Catalog external location. For information, see Create an external location to connect cloud storage to Databricks.

  • The USE CATALOG privilege on the catalog in which you create the streaming table.

  • The USE SCHEMA privilege on the schema in which you create the streaming table.

  • The CREATE TABLE privilege on the schema in which you create the streaming table.

  • The path to your source data.

    Volume path example: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    External location path example: s3://myBucket/analysis

    Note

    This article assumes the data you want to load is in a cloud storage location that corresponds to a Unity Catalog volume or external location you have access to.

Discover and preview source data

  1. In the sidebar of your workspace, click Queries, and then click Create query.

  2. In the query editor, select a SQL warehouse that uses the Current channel from the drop-down list.

  3. Paste the following into the editor, substituting values in angle brackets (<>) for the information identifying your source data, and then click Run.

    Note

    You might encounter schema inference errors when running the read_files table valued function if the defaults for the function can’t parse your data. For example, you might need to configure multi-line mode for multi-line CSV or JSON files. For a list of parser options, see read_files table-valued function.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

Load data into a streaming table

To create a streaming table from data in cloud object storage, paste the following into the query editor, and then click Run:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

Refresh a streaming table using a DLT pipeline

This section describes patterns for refreshing a streaming table with the latest data available from the sources defined in the query.

CREATE operations for streaming tables use a Databricks SQL warehouse for the initial creation and loading of data into the streaming table. REFRESH operations for streaming tables use Delta Live Tables (DLT). A DLT pipeline is automatically created for each streaming table. When a streaming table is refreshed, an update to the DLT pipeline is initiated to process the refresh.

After you run the REFRESH command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.

Note

Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed.

See What is Delta Live Tables?.

Ingest new data only

By default, the read_files function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.

To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles option to false. This means that only data that arrives in the directory after table creation is processed. For example:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

Fully refresh a streaming table

Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.

For example:

REFRESH STREAMING TABLE my_bronze_table FULL

Schedule a streaming table for automatic refresh

To configure a streaming table to automatically refresh based on a defined schedule, paste the following into the query editor, and then click Run:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

For example refresh schedule queries, see ALTER STREAMING TABLE.

Track the status of a refresh

You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the Delta Live Tables UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED command for the streaming table.

DESCRIBE EXTENDED <table-name>

Streaming ingestion from Kafka

For an example of streaming ingestion from Kafka, see read_kafka.

Grant users access to a streaming table

To grant users the SELECT privilege on the streaming table so they can query it, paste the following into the query editor, and then click Run:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.