Load data using streaming tables in Databricks SQL

Databricks recommends using streaming tables to ingest data using Databricks SQL. A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A Delta Live Tables pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.

This article demonstrates using streaming tables to load data from cloud object storage configured as a Unity Catalog volume (recommended) or external location.

Note

To learn how to use Delta Lake tables as streaming sources and sinks, see Delta table streaming reads and writes.

Important

Streaming tables created in Databricks SQL are backed by a serverless Delta Live Tables pipeline. Your workspace must support serverless pipelines to use this functionality.

Before you begin

Before you begin, you must meet the following requirements.

Workspace requirements:

Compute requirements:

You must use one of the following:

  • A SQL warehouse that uses the Current channel.

  • Compute with shared access mode on Databricks Runtime 13.3 LTS or above.

  • Compute with single user access mode on Databricks Runtime 15.4 LTS or above.

    On Databricks Runtime 15.3 and below, you cannot use single user compute to query streaming tables that are owned by other users. You can use single user compute on Databricks Runtime 15.3 and below only if you own the streaming table. The creator of the table is the owner.

    Databricks Runtime 15.4 LTS and above support queries on Delta Live Tables-generated tables on single user compute, regardless of table ownership. To take advantage of the data filtering provided in Databricks Runtime 15.4 LTS and above, you must confirm that your workspace is enabled for serverless compute because the data filtering functionality that supports Delta Live Tables-generated tables runs on serverless compute. You could be charged for serverless compute resources when you use single user compute to run data filtering operations. See Fine-grained access control on single user compute.

Permissions requirements:

  • The READ FILES privilege on a Unity Catalog external location. For information, see Create an external location to connect cloud storage to Databricks.

  • The USE CATALOG privilege on the catalog in which you create the streaming table.

  • The USE SCHEMA privilege on the schema in which you create the streaming table.

  • The CREATE TABLE privilege on the schema in which you create the streaming table.

Other requirements:

  • The path to your source data.

    Volume path example: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    External location path example: s3://myBucket/analysis

    Note

    This article assumes the data you want to load is in a cloud storage location that corresponds to a Unity Catalog volume or external location that you have access to.

Discover and preview source data

  1. In the sidebar of your workspace, click Queries, and then click Create query.

  2. In the query editor, select a SQL warehouse that uses the Current channel from the drop-down list.

  3. Paste the following into the editor, substituting values in angle brackets (<>) for the information identifying your source data, and then click Run.

    Note

    You might encounter schema inference errors when running the read_files table valued function if the defaults for the function can’t parse your data. For example, you might need to configure multi-line mode for multi-line CSV or JSON files. For a list of parser options, see read_files table-valued function.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

Load data into a streaming table

To create a streaming table from data in cloud object storage, paste the following into the query editor, and then click Run:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

Set the runtime channel

Streaming tables created using SQL warehouses are automatically refreshed using a Delta Live Tables pipeline. Delta Live Tables pipelines use the runtime in the current channel by default. See Delta Live Tables release notes and the release upgrade process to learn about the release process.

Databricks recommends using the current channel for production workloads. New features are first released to the preview channel. You can set a pipeline to the preview Delta Live Tables channel to test new features by specifying preview as a table property. You can specify this property when you create the table or after the table is created using an ALTER statement.

The following code example shows how to set the channel to preview in a CREATE statement:

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Refresh a streaming table using a DLT pipeline

This section describes patterns for refreshing a streaming table with the latest data available from the sources defined in the query.

When you CREATE or REFRESH a streaming table, the update processes using a serverless Delta Live Tables pipeline. Each streaming table you define has an associated Delta Live Tables pipeline.

After you run the REFRESH command, the DLT pipeline link is returned. You can use the DLT pipeline link to check the status of the refresh.

Note

Only the table owner can refresh a streaming table to get the latest data. The user that creates the table is the owner, and the owner can’t be changed. You might need to refresh your streaming table before using time travel queries.

See What is Delta Live Tables?.

Ingest new data only

By default, the read_files function reads all existing data in the source directory during table creation, and then processes newly arriving records with each refresh.

To avoid ingesting data that already exists in the source directory at the time of table creation, set the includeExistingFiles option to false. This means that only data that arrives in the directory after table creation is processed. For example:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

Fully refresh a streaming table

Full refreshes re-process all data available in the source with the latest definition. It is not recommended to call full refreshes on sources that don’t keep the entire history of the data or have short retention periods, such as Kafka, because the full refresh truncates the existing data. You might not be able to recover old data if the data is no longer available in the source.

For example:

REFRESH STREAMING TABLE my_bronze_table FULL

Schedule a streaming table for automatic refresh

To configure a streaming table to automatically refresh based on a defined schedule, paste the following into the query editor, and then click Run:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

For example refresh schedule queries, see ALTER STREAMING TABLE.

Track the status of a refresh

You can view the status of a streaming table refresh by viewing the pipeline that manages the streaming table in the Delta Live Tables UI or by viewing the Refresh Information returned by the DESCRIBE EXTENDED command for the streaming table.

DESCRIBE EXTENDED <table-name>

Streaming ingestion from Kafka

For an example of streaming ingestion from Kafka, see read_kafka.

Grant users access to a streaming table

To grant users the SELECT privilege on the streaming table so they can query it, paste the following into the query editor, and then click Run:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

For more information about granting privileges on Unity Catalog securable objects, see Unity Catalog privileges and securable objects.

Monitor runs using query history

You can use the query history page to access query details and query profiles that can help you identify poorly performing queries and bottlenecks in the Delta Live Tables pipeline used to run your streaming table updates. For an overview of the kind of information available in query histories and query profiles, see Query history and Query profile.

Preview

This feature is in Public Preview. Workspace admins can enable this feature from the Previews page. See Manage Databricks Previews.

All statements related to streaming tables appear in the query history. You can use the Statement drop-down filter to select any command and inspect the related queries. All CREATE statements are followed by a REFRESH statement that executes asynchronously on a Delta Live Tables pipeline. The REFRESH statements typically include detailed query plans that provide insights into optimizing performance.

To access REFRESH statements in the query history UI, use the following steps:

  1. Click History Icon in the left sidebar to open the Query History UI.

  2. Select the REFRESH checkbox from the Statement drop-down filter.

  3. Click the name of the query statement to view summary details like the duration of the query and aggregated metrics.

  4. Click See query profile to open the query profile. See Query profile for details about navigating the query profile.

  5. Optionally, you can use the links in the Query Source section to open the related query or pipeline.

You can also access query details using links in the SQL editor or from a notebook attached to a SQL warehouse.

Note

Your streaming table must be configured to run using the preview channel. See Set the runtime channel.