Transform data with Delta Live Tables

This article describes how you can use Delta Live Tables to declare transformations on datasets and specify how records are processed through query logic. It also contains some examples of common transformation patterns that can be useful when building out Delta Live Tables pipelines.

You can define a dataset against any query that returns a DataFrame. You can use Apache Spark built-in operations, UDFs, custom logic, and MLflow models as transformations in your Delta Live Tables pipeline. Once data has been ingested into your Delta Live Tables pipeline, you can define new datasets against upstream sources to create new streaming tables, materialized views, and views.

To learn how to effectively perform stateful processing with Delta Live Tables, see Optimize stateful processing in Delta Live Tables with watermarks.

When to use views, materialized views, and streaming tables

To ensure your pipelines are efficient and maintainable, choose the best dataset type when you implement your pipeline queries.

Consider using a view when:

  • You have a large or complex query that you want to break into easier-to-manage queries.

  • You want to validate intermediate results using expectations.

  • You want to reduce storage and compute costs and do not require the materialization of query results. Because tables are materialized, they require additional computation and storage resources.

Consider using a materialized view when:

  • Multiple downstream queries consume the table. Because views are computed on demand, the view is re-computed every time the view is queried.

  • Other pipelines, jobs, or queries consume the table. Because views are not materialized, you can only use them in the same pipeline.

  • You want to view the results of a query during development. Because tables are materialized and can be viewed and queried outside of the pipeline, using tables during development can help validate the correctness of computations. After validating, convert queries that do not require materialization into views.

Consider using a streaming table when:

  • A query is defined against a data source that is continuously or incrementally growing.

  • Query results should be computed incrementally.

  • High throughput and low latency is desired for the pipeline.

Note

Streaming tables are always defined against streaming sources. You can also use streaming sources with APPLY CHANGES INTO to apply updates from CDC feeds. See Simplified change data capture with the APPLY CHANGES API in Delta Live Tables.

Combine streaming tables and materialized views in a single pipeline

Streaming tables inherit the processing guarantees of Apache Spark Structured Streaming and are configured to process queries from append-only data sources, where new rows are always inserted into the source table rather than modified.

Note

Although, by default, streaming tables require append-only data sources, when a streaming source is another streaming table that requires updates or deletes, you can override this behavior with the skipChangeCommits flag.

A common streaming pattern includes ingesting source data to create the initial datasets in a pipeline. These initial datasets are commonly called bronze tables and often perform simple transformations.

By contrast, the final tables in a pipeline, commonly referred to as gold tables, often require complicated aggregations or reading from sources that are the targets of an APPLY CHANGES INTO operation. Because these operations inherently create updates rather than appends, they are not supported as inputs to streaming tables. These transformations are better suited for materialized views.

By mixing streaming tables and materialized views into a single pipeline, you can simplify your pipeline, avoid costly re-ingestion or re-processing of raw data, and have the full power of SQL to compute complex aggregations over an efficiently encoded and filtered dataset. The following example illustrates this type of mixed processing:

Note

These examples use Auto Loader to load files from cloud storage. To load files with Auto Loader in a Unity Catalog enabled pipeline, you must use external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "s3://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

Learn more about using Auto Loader to efficiently read JSON files from S3 for incremental processing.

Stream-static joins

Stream-static joins are a good choice when denormalizing a continuous stream of append-only data with a primarily static dimension table.

With each pipeline update, new records from the stream are joined with the most current snapshot of the static table. If records are added or updated in the static table after corresponding data from the streaming table has been processed, the resultant records are not recalculated unless a full refresh is performed.

In pipelines configured for triggered execution, the static table returns results as of the time the update started. In pipelines configured for continuous execution, each time the table processes an update, the most recent version of the static table is queried.

The following is an example of a stream-static join:

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

Calculate aggregates efficiently

You can use streaming tables to incrementally calculate simple distributive aggregates like count, min, max, or sum, and algebraic aggregates like average or standard deviation. Databricks recommends incremental aggregation for queries with a limited number of groups, for example, a query with a GROUP BY country clause. Only new input data is read with each update.

Use MLflow models in a Delta Live Tables pipeline

Note

To use MLflow models in a Unity Catalog-enabled pipeline, your pipeline must be configured to use the preview channel. To use the current channel, you must configure your pipeline to publish to the Hive metastore.

You can use MLflow-trained models in Delta Live Tables pipelines. MLflow models are treated as transformations in Databricks, meaning they act upon a Spark DataFrame input and return results as a Spark DataFrame. Because Delta Live Tables defines datasets against DataFrames, you can convert Apache Spark workloads that leverage MLflow to Delta Live Tables with just a few lines of code. For more on MLflow, see ML lifecycle management using MLflow.

If you already have a Python notebook calling an MLflow model, you can adapt this code to Delta Live Tables by using the @dlt.table decorator and ensuring functions are defined to return transformation results. Delta Live Tables does not install MLflow by default, so make sure you %pip install mlflow and import mlflow and dlt at the top of your notebook. For an introduction to Delta Live Tables syntax, see Example: Ingest and process New York baby names data.

To use MLflow models in Delta Live Tables, complete the following steps:

  1. Obtain the run ID and model name of the MLflow model. The run ID and model name are used to construct the URI of the MLflow model.

  2. Use the URI to define a Spark UDF to load the MLflow model.

  3. Call the UDF in your table definitions to use the MLflow model.

The following example shows the basic syntax for this pattern:

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

As a complete example, the following code defines a Spark UDF named loaded_model_udf that loads an MLflow model trained on loan risk data. The data columns used to make the prediction are passed as an argument to the UDF. The table loan_risk_predictions calculates predictions for each row in loan_risk_input_data.

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

Retain manual deletes or updates

Delta Live Tables allows you to manually delete or update records from a table and do a refresh operation to recompute downstream tables.

By default, Delta Live Tables recomputes table results based on input data each time a pipeline is updated, so you must ensure the deleted record isn’t reloaded from the source data. Setting the pipelines.reset.allowed table property to false prevents refreshes to a table but does not prevent incremental writes to the tables or prevent new data from flowing into the table.

The following diagram illustrates an example using two streaming tables:

  • raw_user_table ingests raw user data from a source.

  • bmi_table incrementally computes BMI scores using weight and height from raw_user_table.

You want to manually delete or update user records from the raw_user_table and recompute the bmi_table.

Retain data diagram

The following code demonstrates setting the pipelines.reset.allowed table property to false to disable full refresh for raw_user_table so that intended changes are retained over time, but downstream tables are recomputed when a pipeline update is run:

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);