append_flow
The @dp.append_flow
decorator creates append flows or backfills for your Lakeflow Declarative Pipelines tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with Lakeflow Declarative Pipelines flows.
Append flows can target streaming tables or sinks.
Syntax
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <boolean>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parameters
Parameter | Type | Description |
---|---|---|
function |
| Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query. |
|
| Required. The name of the table or sink that is the target of the append flow. |
|
| The flow name. If not provided, defaults to the function name. |
|
| Optionally, define the flow as a one-time flow, such as a backfill. Using
|
|
| A description for the flow. |
|
| A list of Spark configurations for the execution of this query |
Examples
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))