Skip to main content

append_flow

The @dlt.append_flow decorator creates append flows or backfills for your DLT tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with DLT flows.

Append flows can target streaming tables or sinks.

Syntax

Python
import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)

Parameters

Parameter

Type

Description

function

function

Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.

target

str

Required. The name of the table or sink that is the target of the append flow.

name

str

The flow name. If not provided, defaults to the function name.

comment

str

A description for the flow.

spark_conf

dict

A list of Spark configurations for the execution of this query

Examples

Python
import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))