Skip to main content

Load and process data incrementally with DLT flows

Data is processed on DLT through flows. Each flow consists of a query and, typically, a target. The flow processes the query, either as a batch, or incrementally as a stream of data into the target. A flow lives within an ETL pipeline in Databricks.

Typically, flows are defined automatically when you create a query in DLT that updates a target, but you can also explicitly define additional flows for more complex processing, such as appending to a single target from multiple sources.

Updates

A flow is run each time that its defining pipeline is updated. The flow will create or update tables with the most recent data available. Depending on the type of flow and the state of the changes to the data, the update may perform an incremental refresh, which only processes only new records, or perform a full refresh, that reprocesses all records from the data source.

Create a default flow

When you create a DLT object in a pipeline, you typically define a table or a view along with the query that supports it. For example, in this SQL query, you create a streaming table called customers_silver by reading from the table called customers_bronze.

SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

You can create the same streaming table in Python, as well. In Python, you typically use DLT by creating a query function that returns a dataframe, with decorators to access DLT functionality:

Python
import dlt

@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")

In this example, you created a streaming table. You can also create materialized views with similar syntax in both SQL and Python. For more information, see Streaming tables and Materialized views.

This example creates a default flow along with the streaming table. The default flow for a streaming table is an append flow, which add new rows with each trigger. This is the most common way to use DLT, to create a flow and the target in a single step. You can use this style to ingest data or to transform data.

Append flows also support processing that requires reading data from multiple streaming sources to update a single target. For example, you can use append flow functionality when you have an existing streaming table and flow and want to add a new streaming source that writes to this existing streaming table.

Using multiple flows to write to a single target

In the previous example, you created a flow and a streaming table in a single step. You can create flows for a previously created table, as well. In this example, you can see creating a table and the flow associated with it in separate steps. This code has identical results as creating a default flow, including using the same name for the streaming table and the flow.

Python
import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")

Creating a flow independently from the target means that you can also create multiple flows that append data to the same target.

Use the @append_flow decorator in the Python interface or the CREATE FLOW...INSERT INTO clause in the SQL interface to create a new flow, for example to target a streaming table from multiple streaming sources. Use append flow for processing tasks such as the following:

  • Add streaming sources that append data to an existing streaming table without requiring a full refresh. For example, you might have a table combining regional data from every region you operate in. As new regions are rolled out, you can add the new region data to the table without performing a full refresh. For an example of adding streaming sources to existing streaming table, see Example: Write to a streaming table from multiple Kafka topics.
  • Update a streaming table by appending missing historical data (backfilling). For example, you have an existing streaming table that is written to by an Apache Kafka topic. You also have historical data stored in a table that you need inserted exactly once into the streaming table, and you cannot stream the data because your processing includes performing a complex aggregation before inserting the data. For an example of a backfill, see Example: Run a one-time data backfill.
  • Combine data from multiple sources and write to a single streaming table instead of using the UNION clause in a query. Using append flow processing instead of UNION allows you to update the target table incrementally without running a full refresh update. For an example of a union done in this way, see Example: Use append flow processing instead of UNION.

The target for the records output by the append flow processing can be an existing table or a new table. For Python queries, use the create_streaming_table() function to create a target table.

The following example adds two flows for the same target, creating a union of the two source tables:

Python
import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
important
  • If you need to define data quality constraints with expectations, define the expectations on the target table as part of the create_streaming_table() function or on an existing table definition. You cannot define expectations in the @append_flow definition.
  • Flows are identified by a flow name, and this name is used to identify streaming checkpoints. The use of the flow name to identify the checkpoint means the following:
    • If an existing flow in a pipeline is renamed, the checkpoint does not carry over, and the renamed flow is effectively an entirely new flow.
    • You cannot reuse a flow name in a pipeline, because the existing checkpoint won’t match the new flow definition.

Types of flows

The default flows for streaming tables and materialized views are append flows. You can also create flows to read from change data capture data sources. The following table describes the different types of flows.

Flow type

Description

Append

Append flows are the most common type of flow, where new records in the source are written to the target with each update. They correspond to append mode in structured streaming. You can add the ONCE flag, indicating a batch query whose data should be inserted into the target only once, unless the target is fully refreshed. Any number of append flows my write to a particular target.

Default flows (created with the target streaming table or materialized view) will have the same name as the target. Other targets do not have default flows.

Apply changes

An apply changes flow ingests a query containing change data capture (CDC) data. Apply changes flows can only target streaming tables, and the source must be a streaming source (even in the case of ONCE flows).Multiple apply changes flows can target a single streaming table. A streaming table that acts as a target for an apply changes flow can only be targeted by other apply changes flows.

For more information about CDC data, see The APPLY CHANGES APIs: Simplify change data capture with DLT.

Additional information

For more information on flows and their use, see the following topics: