Skip to main content

Develop pipeline code with SQL

DLT introduces several new SQL keywords and functions for defining materialized views and streaming tables in pipelines. SQL support for developing pipelines builds upon the basics of Spark SQL and adds support for Structured Streaming functionality.

Users familiar with PySpark DataFrames might prefer developing pipeline code with Python. Python supports more extensive testing and operations that are challenging to implement with SQL, such as metaprogramming operations. See Develop pipeline code with Python.

For a full reference of DLT SQL syntax, see DLT SQL language reference.

Basics of SQL for pipeline development

SQL code that creates DLT datasets uses the CREATE OR REFRESH syntax to define materialized views and streaming tables against query results.

The STREAM keyword indicates if the data source referenced in a SELECT clause should be read with streaming semantics.

Reads and writes default to the catalog and schema specified during pipeline configuration. See Set the target catalog and schema.

DLT source code critically differs from SQL scripts: DLT evaluates all dataset definitions across all source code files configured in a pipeline and builds a dataflow graph before any queries are run. The order of queries appearing in a notebook or script defines the order of code evaluation, but not the order of query execution.

Create a materialized view with SQL

The following code example demonstrates the basic syntax for creating a materialized view with SQL:

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Create a streaming table with SQL

The following code example demonstrates the basic syntax for creating a streaming table with SQL. When reading a source for a streaming table, the STREAM keyword indicates to use streaming semantics for the source. Do not use the STREAM keyword when creating a materialized view:

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
note

Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and the SkipChangeCommits option to handle errors.

Load data from object storage

DLT supports loading data from all formats supported by Databricks. See Data format options.

note

These examples use data available under the /databricks-datasets automatically mounted to your workspace. Databricks recommends using volume paths or cloud URIs to reference data stored in cloud object storage. See What are Unity Catalog volumes?.

Databricks recommends using Auto Loader and streaming tables when configuring incremental ingestion workloads against data stored in cloud object storage. See What is Auto Loader?.

SQL uses the read_files function to invoke Auto Loader functionality. You must also use the STREAM keyword to configure a streaming read with read_files.

The following describes the syntax for read_files in SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

Options for Auto Loader are key-value pairs. For details on supported formats and options, see Options.

The following example creates a streaming table from JSON files using Auto Loader:

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

The read_files function also supports batch semantics to create materialized views. The following example uses batch semantics to read a JSON directory and create a materialized view:

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

Validate data with expectations

You can use expectations to set and enforce data quality constraints. See Manage data quality with pipeline expectations.

The following code defines an expectation named valid_data that drops records that are null during data ingestion:

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Query materialized views and streaming tables defined in your pipeline

The following example defines four datasets:

  • A streaming table named orders that loads JSON data.
  • A materialized view named customers that loads CSV data.
  • A materialized view named customer_orders that joins records from the orders and customers datasets, casts the order timestamp to a date, and selects the customer_id, order_number, state, and order_date fields.
  • A materialized view named daily_orders_by_state that aggregates the daily count of orders for each state.
note

When querying views or tables in your pipeline, you can specify the catalog and schema directly, or you can use the defaults configured in your pipeline. In this example, the orders, customers, and customer_orders tables are written and read from the default catalog and schema configured for your pipeline.

Legacy publishing mode uses the LIVE schema to query other materialized views and streaming tables defined in your pipeline. In new pipelines, the LIVE schema syntax is silently ignored. See LIVE schema (legacy).

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Define a private table

You can use the PRIVATE clause when creating a materialized view or a streaming table. When you create a private table, you create the table, but do not create the metadata for the table. The PRIVATE clause instructs DLT to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a private table persists for the lifetime of the pipeline that creates it, and not just a single update.

Private tables can have the same name as tables in the catalog. If you specify an unqualified name for a table within a pipeline, if there is both a private table and a catalog table with that name, the private table will be used.

Private tables were previously refererred to as temporary tables.

Permanently delete records from a materialized view or streaming table

To permanently delete records from a streaming table with deletion vectors enabled, such as for GDPR compliance, additional operations must be performed on the object’s underlying Delta tables. To ensure the deletion of records from a streaming table, see Permanently delete records from a streaming table.

Materialized views always reflect the data in the underlying tables when they are refreshed. To delete data in a Materialized view, you must delete the data from the source and refresh the materialized view.

Parameterize values used when declaring tables or views with SQL

Use SET to specify a configuration value in a query that declares a table or view, including Spark configurations. Any table or view you define in a notebook after the SET statement has access to the defined value. Any Spark configurations specified using the SET statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}. The following example sets a Spark configuration value named startDate and uses that value in a query:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

To specify multiple configuration values, use a separate SET statement for each value.

Limitations

The PIVOT clause is not supported. The pivot operation in Spark requires the eager loading of input data to compute the output schema. This capability is not supported in DLT.

note

The CREATE OR REFRESH LIVE TABLE syntax to create a materialized view is deprecated. Instead, use CREATE OR REFRESH MATERIALIZED VIEW.