update_flow
The update_flow API is in Public Preview.
Use the @dp.update_flow decorator to create an update flow. Update flows write to sinks using update output mode, emitting only the rows that changed in each batch. Unlike append flows, they support stateful aggregations without requiring a watermark.
Update flows can only target sinks. Delta tables are not supported.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
Parameter | Type | Description |
|---|---|---|
function |
| Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query. |
|
| Required. The name of the sink this flow writes to. |
|
| The flow name. If not provided, defaults to the function name. |
|
| A description for the flow. |
|
| A dict of Spark configurations for the execution of this query. These configurations override confs set for the destination, pipeline, or cluster. |
|
| An external checkpoint path to import before starting the flow. Imported only once, when the flow's checkpoint directory does not yet exist. |
Examples
Aggregation to a Kafka sink
Write stateful aggregation results to a Kafka sink:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Real-time mode
Use spark_conf to configure an update flow for real-time mode:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- Delta table sinks are not supported as targets for update flows.