Develop pipeline code with SQL
DLT introduces several new SQL keywords and functions for defining materialized views and streaming tables in pipelines. SQL support for developing pipelines builds upon the basics of Spark SQL and adds support for Structured Streaming functionality.
Users familiar with PySpark DataFrames might prefer developing pipeline code with Python. Python supports more extensive testing and operations that are challenging to implement with SQL, such as metaprogramming operations. See Develop pipeline code with Python.
For a full reference of DLT SQL syntax, see DLT SQL language reference.
Basics of SQL for pipeline development
SQL code that creates DLT datasets uses the CREATE OR REFRESH
syntax to define materialized views and streaming tables against query results.
The STREAM
keyword indicates if the data source referenced in a SELECT
clause should be read with streaming semantics.
Reads and writes default to the catalog and schema specified during pipeline configuration. See Set the target catalog and schema.
DLT source code critically differs from SQL scripts: DLT evaluates all dataset definitions across all source code files configured in a pipeline and builds a dataflow graph before any queries are run. The order of queries appearing in a notebook or script defines the order of code evaluation, but not the order of query execution.
Create a materialized view with SQL
The following code example demonstrates the basic syntax for creating a materialized view with SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Create a streaming table with SQL
The following code example demonstrates the basic syntax for creating a streaming table with SQL. When reading a source for a streaming table, the STREAM
keyword indicates to use streaming semantics for the source. Do not use the STREAM
keyword when creating a materialized view:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Use the STREAM keyword to use streaming semantics to read from the source. If the read encounters a change or deletion to an existing record, an error is thrown. It is safest to read from static or append-only sources. To ingest data that has change commits, you can use Python and the SkipChangeCommits
option to handle errors.
Load data from object storage
DLT supports loading data from all formats supported by Databricks. See Data format options.
These examples use data available under the /databricks-datasets
automatically mounted to your workspace. Databricks recommends using volume paths or cloud URIs to reference data stored in cloud object storage. See What are Unity Catalog volumes?.
Databricks recommends using Auto Loader and streaming tables when configuring incremental ingestion workloads against data stored in cloud object storage. See What is Auto Loader?.
SQL uses the read_files
function to invoke Auto Loader functionality. You must also use the STREAM
keyword to configure a streaming read with read_files
.
The following describes the syntax for read_files
in SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
Options for Auto Loader are key-value pairs. For details on supported formats and options, see Options.
The following example creates a streaming table from JSON files using Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
The read_files
function also supports batch semantics to create materialized views. The following example uses batch semantics to read a JSON directory and create a materialized view:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Validate data with expectations
You can use expectations to set and enforce data quality constraints. See Manage data quality with pipeline expectations.
The following code defines an expectation named valid_data
that drops records that are null during data ingestion:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Query materialized views and streaming tables defined in your pipeline
The following example defines four datasets:
- A streaming table named
orders
that loads JSON data. - A materialized view named
customers
that loads CSV data. - A materialized view named
customer_orders
that joins records from theorders
andcustomers
datasets, casts the order timestamp to a date, and selects thecustomer_id
,order_number
,state
, andorder_date
fields. - A materialized view named
daily_orders_by_state
that aggregates the daily count of orders for each state.
When querying views or tables in your pipeline, you can specify the catalog and schema directly, or you can use the defaults configured in your pipeline. In this example, the orders
, customers
, and customer_orders
tables are written and read from the default catalog and schema configured for your pipeline.
Legacy publishing mode uses the LIVE
schema to query other materialized views and streaming tables defined in your pipeline. In new pipelines, the LIVE
schema syntax is silently ignored. See LIVE schema (legacy).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Define a private table
You can use the PRIVATE
clause when creating a materialized view or a streaming table. When you create a private table, you create the table, but do not create the metadata for the table. The PRIVATE
clause instructs DLT to create a table that is available to the pipeline but should not be accessed outside the pipeline. To reduce processing time, a private table persists for the lifetime of the pipeline that creates it, and not just a single update.
Private tables can have the same name as tables in the catalog. If you specify an unqualified name for a table within a pipeline, if there is both a private table and a catalog table with that name, the private table will be used.
Private tables were previously refererred to as temporary tables.
Permanently delete records from a materialized view or streaming table
To permanently delete records from a streaming table with deletion vectors enabled, such as for GDPR compliance, additional operations must be performed on the object’s underlying Delta tables. To ensure the deletion of records from a streaming table, see Permanently delete records from a streaming table.
Materialized views always reflect the data in the underlying tables when they are refreshed. To delete data in a Materialized view, you must delete the data from the source and refresh the materialized view.
Parameterize values used when declaring tables or views with SQL
Use SET
to specify a configuration value in a query that declares a table or view, including Spark configurations. Any table or view you define in a notebook after the SET
statement has access to the defined value. Any Spark configurations specified using the SET
statement are used when executing the Spark query for any table or view following the SET statement. To read a configuration value in a query, use the string interpolation syntax ${}
. The following example sets a Spark configuration value named startDate
and uses that value in a query:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
To specify multiple configuration values, use a separate SET
statement for each value.
Limitations
The PIVOT
clause is not supported. The pivot
operation in Spark requires the eager loading of input data to compute the output schema. This capability is not supported in DLT.
The CREATE OR REFRESH LIVE TABLE
syntax to create a materialized view is deprecated. Instead, use CREATE OR REFRESH MATERIALIZED VIEW
.