Skip to main content

append_flow

The @dlt.append_flow decorator creates append flows or backfills for your Lakeflow Declarative Pipelines tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with Lakeflow Declarative Pipelines flows.

Append flows can target streaming tables or sinks.

Syntax

Python
import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <boolean>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #

Parameters

Parameter

Type

Description

function

function

Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.

target

str

Required. The name of the table or sink that is the target of the append flow.

name

str

The flow name. If not provided, defaults to the function name.

once

bool

Optionally, define the flow as a one-time flow, such as a backfill. Using once=True changes the flow in two ways:

  • The return value. streaming-query. must be a batch DataFrame in this case, not a streaming DataFrame.
  • The flow is run one time by default. If the pipeline is updated with a complete refresh, then the ONCE flow runs again to recreate the data.

comment

str

A description for the flow.

spark_conf

dict

A list of Spark configurations for the execution of this query

Examples

Python
import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>

# Add a backfill
@dlt.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)

# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Was this article helpful?