Delta Live Tables concepts
This article introduces the fundamental concepts you should understand to use Delta Live Tables effectively.
Pipelines
The main unit of execution in Delta Live Tables is a pipeline. A pipeline is a directed acyclic graph (DAG) linking data sources to target datasets. You define the contents of Delta Live Tables datasets using SQL queries or Python functions that return Spark SQL or Koalas DataFrames. A pipeline also has an associated configuration defining the settings required to run the pipeline. You can optionally specify data quality constraints when defining datasets.
You implement Delta Live Tables pipelines in Databricks notebooks. You can implement pipelines in a single notebook or in multiple notebooks. All queries in a single notebook must be implemented in either Python or SQL, but you can configure multiple-notebook pipelines with a mix of Python and SQL notebooks. Each notebook shares a storage location for output data and is able to reference datasets from other notebooks in the pipeline.
You can use Databricks Repos to store and manage your Delta Live Tables notebooks. To make a notebook managed with Databricks Repos available when you create a pipeline:
Add the comment line
-- Databricks notebook source
at the top of a SQL notebook.Add the comment line
# Databricks notebook source
at the top of a Python notebook.
You can also use a Databricks repo to store your Python code and import it as modules in your pipeline notebook. See Import Python modules from a Databricks repo.
See Create, run, and manage Delta Live Tables pipelines to learn more about creating and running a pipeline. See Configure multiple notebooks in a pipeline for an example of configuring a multi-notebook pipeline.
Queries
Queries implement data transformations by defining a data source and a target dataset. Delta Live Tables queries can be implemented in Python or SQL.
Expectations
You use expectations to specify data quality controls on the contents of a dataset. Unlike a CHECK
constraint in a traditional database which prevents adding any records that fail the constraint, expectations provide flexibility when processing data that fails data quality requirements. This flexibility allows you to process and store data that you expect to be messy and data that must meet strict quality requirements.
You can define expectations to retain records that fail validation, drop records that fail validation, or halt the pipeline when a record fails validation.
Pipeline settings
Pipeline settings are defined in JSON and include the parameters required to run the pipeline, including:
Libraries (in the form of notebooks) that contain the queries that describe the tables and views to create the target datasets in Delta Lake.
A cloud storage location where the tables and metadata required for processing will be stored. This location is either DBFS or another location you provide.
Optional configuration for a Spark cluster where data processing will take place.
See Delta Live Tables settings for more details.
Datasets
There are two types of datasets in a Delta Live Tables pipeline: views and tables.
Views are similar to a temporary view in SQL and are an alias for some computation. A view allows you to break a complicated query into smaller or easier-to-understand queries. Views also allow you to reuse a given transformation as a source for more than one table. Views are available within a pipeline only and cannot be queried interactively.
Tables are similar to traditional materialized views. The Delta Live Tables runtime automatically creates tables in the Delta format and ensures those tables are updated with the latest result of the query that creates the table.
You can define a live or streaming live view or table:
A live table or view always reflects the results of the query that defines it, including when the query defining the table or view is updated, or an input data source is updated. Like a traditional materialized view, a live table or view may be entirely computed when possible to optimize computation resources and time.
A streaming live table or view processes data that has been added only since the last pipeline update. Streaming tables and views are stateful; if the defining query changes, new data will be processed based on the new query and existing data is not recomputed.
Streaming live tables are valuable for a number of use cases, including:
Data retention: a streaming live table can preserve data indefinitely, even when an input data source has low retention, for example, a streaming data source such as Apache Kafka or Amazon Kinesis.
Data source evolution: data can be retained even if the data source changes, for example, moving from Kafka to Kinesis.
You can publish your tables to make them available for discovery and querying by downstream consumers.
Pipeline updates
After you create the pipeline and are ready to run it, you start an update. An update:
Starts a cluster with the correct configuration.
Discovers all the tables and views defined, and checks for any analysis errors such as invalid column names, missing dependencies, and syntax errors.
Creates or updates tables and views with the most recent data available.
The tables and views updated, and how those tables are views are updated, depends on the update type:
Refresh all: All live tables are updated to reflect the current state of their input data sources. For all streaming live tables, new rows are appended to the table.
Full refresh all: All live tables are updated to reflect the current state of their input data sources. For all streaming live tables, Delta Live Tables attempts to clear all data from each table and then load all data from the streaming source.
Refresh selection: The behavior of
refresh selection
is identical torefresh all
, but allows you to refresh only selected tables. Selected live tables are updated to reflect the current state of their input data sources. For selected streaming live tables, new rows are appended to the table.Full refresh selection: The behavior of
full refresh selection
is identical tofull refresh all
, but allows you to perform a full refresh of only selected tables. Selected live tables are updated to reflect the current state of their input data sources. For selected streaming live tables, Delta Live Tables attempts to clear all data from each table and then load all data from the streaming source.
For existing live tables, an update has the same behavior as a SQL REFRESH
on a materialized view. For new live tables, the behavior is the same as a SQL CREATE
operation.
If the pipeline is triggered, the system stops processing after refreshing all tables or selected tables in the pipeline once.
When a triggered updates completes successfully, each table that is part of the update is guaranteed to be updated based on the data available when the update started.
For use cases that require low latency, you can configure a pipeline to update continuously. See Continuous and triggered pipelines for more information about choosing an execution mode for your pipeline.
Continuous and triggered pipelines
Delta Live Tables supports two different modes of execution:
Triggered pipelines update each table with whatever data is currently available and then stop the cluster running the pipeline. Delta Live Tables automatically analyzes the dependencies between your tables and starts by computing those that read from external sources. Tables within the pipeline are updated after their dependent data sources have been updated.
Continuous pipelines update tables continuously as input data changes. Once an update is started, it continues to run until manually stopped. Continuous pipelines require an always-running cluster but ensure that downstream consumers have the most up-to-date data.
Triggered pipelines can reduce resource consumption and expense since the cluster runs only long enough to execute the pipeline. However, new data won’t be processed until the pipeline is triggered. Continuous pipelines require an always-running cluster, which is more expensive but reduces processing latency.
The continuous
flag in the pipeline settings controls the execution mode. Pipelines run in triggered execution mode by default. Set continuous
to true
if you require low latency refreshes of the tables in your pipeline.
{
...
"continuous": true,
...
}
The execution mode is independent of the type of table being computed. Both live and streaming live tables can be updated in either execution mode.
If some tables in your pipeline have weaker latency requirements, you can configure their update frequency independently by setting the pipelines.trigger.interval
setting:
spark_conf={"pipelines.trigger.interval": "1 hour"}
This option does not turn off the cluster in between pipeline updates, but can free up resources for updating other tables in your pipeline.
Tables and views in continuous pipelines
You can use both live tables or views and streaming live tables or views in a pipeline that runs continuously. To avoid unnecessary processing, pipelines automatically monitor dependent Delta tables and perform an update only when the contents of those dependent tables have changed.
The Delta Live Tables runtime is not able to detect changes in non-Delta data sources. The table is still updated regularly, but with a higher default trigger interval to prevent excessive recomputation from slowing down any incremental processing happening on the cluster.
Development and production modes
You can optimize pipeline execution by switching between development and production modes. Use the buttons in the Pipelines UI to switch between these two modes. By default, pipelines run in development mode.
When you run your pipeline in development mode, the Delta Live Tables system:
Reuses a cluster to avoid the overhead of restarts. By default, clusters run for two hours when development mode is enabled. You can change this with the
pipelines.clusterShutdown.delay
setting in the Cluster configuration.Disables pipeline retries so you can immediately detect and fix errors.
In production mode, the Delta Live Tables system:
Restarts the cluster for specific recoverable errors, including memory leaks and stale credentials.
Retries execution in the event of specific errors, for example, a failure to start a cluster.
Note
Switching between development and production modes only controls cluster and pipeline execution behavior. Storage locations and target schemas in the catalog for publishing tables must be configured as part of pipeline settings and are not affected when switching between modes.
Databricks Enhanced Autoscaling
Databricks Enhanced Autoscaling optimizes cluster utilization by automatically allocating cluster resources based on workload volume, with minimal impact to the data processing latency of your pipelines.
Enhanced Autoscaling improves on the Databricks cluster autoscaling functionality with the following features:
Enhanced Autoscaling implements optimization of streaming workloads, and adds enhancements to improve the performance of batch workloads. These optimizations result in more efficient cluster utilization, reduced resource usage, and lower cost.
Enhanced Autoscaling proactively shuts down under-utilized nodes while guaranteeing there are no failed tasks during shutdown. The existing cluster autoscaling feature scales down nodes only if the node is idle.
Enhanced Autoscaling is the default autoscaling mode when you create a new pipeline in the Delta Live Tables UI. You can enable Enhanced Autoscaling for existing pipelines by editing the pipeline settings in the UI. You can also enable Enhanced Autoscaling when you create or edit pipelines with the Delta Live Tables API.
Enable Enhanced Autoscaling
To use Enhanced Autoscaling, do one of the following:
Set Cluster mode to Enhanced autoscaling when you create a pipeline or edit a pipeline in the Delta Live Tables UI.
Add the
autoscale
configuration to the pipelinedefault
cluster and set themode
field toENHANCED
. The following example configures an Enhanced Autoscaling cluster with a minimum of 5 workers and a maximum of 10 workers.max_workers
must be greater than or equal tomin_workers
.
Note
Enhanced Autoscaling is available for the
default
cluster only. If you include theautoscale
configuration in the maintenance cluster configuration, the existing cluster autoscaling feature is used.The
autoscale
configuration has two modes:LEGACY
: Use cluster autoscaling.ENHANCED
: Use Enhanced Autoscaling.
{
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 5,
"max_workers": 10,
"mode": "ENHANCED"
}
}
]
}
The pipeline is automatically restarted after the autoscaling configuration changes if the pipeline is continuous. After restart, expect a short period of increased latency. Following this brief period of increased latency, the cluster size should be updated based on your autoscale
configuration, and the pipeline latency returned to its previous latency characteristics.
Monitoring Enhanced Autoscaling enabled pipelines
You can use the Delta Live Tables event log to monitor Enhanced Autoscaling metrics. You can view the metrics in the user interface. Enhanced Autoscaling events have the autoscale
event type. The following are example events:
Event |
Message |
---|---|
Cluster resize request started |
|
Cluster resize request succeeded |
|
Cluster resize request partially succeeded |
|
Cluster resize request failed |
|
You can also view Enhanced Autoscaling events by directly querying the event log:
To query the event log for backlog metrics, see Backlog metrics.
To monitor cluster resizing requests and responses during Enhanced Autoscaling operations, see Databricks Enhanced Autoscaling events.
Product editions
You can use the Delta Live Tables product edition option to run your pipeline with the features best suited for the pipeline requirements. The following product editions are available:
Core
to run streaming ingest workloads. Select theCore
edition if your pipeline doesn’t require advanced features such as change data capture (CDC) or Delta Live Tables expectations.Pro
to run streaming ingest and CDC workloads. ThePro
product edition supports all of theCore
features, plus support for workloads that require updating tables based on changes in source data.Advanced
to run streaming ingest workloads, CDC workloads, and workloads that require expectations. TheAdvanced
product edition supports the features of theCore
andPro
editions, and also supports enforcement of data quality constraints with Delta Live Tables expectations.
You can select the product edition when you create or edit a pipeline. You can select a different edition for each pipeline.
If your pipeline includes features not supported by the selected product edition, for example, expectations, you will receive an error message with the reason for the error. You can then edit the pipeline to select the appropriate edition.