Skip to main content

update_flow

Public Preview

The update_flow API is in Public Preview.

Use the @dp.update_flow decorator to create an update flow. Update flows write to sinks using update output mode, emitting only the rows that changed in each batch. Unlike append flows, they support stateful aggregations without requiring a watermark.

Update flows can only target sinks. Delta tables are not supported.

Syntax

Python
from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)

Parameters

Parameter

Type

Description

function

function

Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.

target

str

Required. The name of the sink this flow writes to.

name

str

The flow name. If not provided, defaults to the function name.

comment

str

A description for the flow.

spark_conf

dict

A dict of Spark configurations for the execution of this query. These configurations override confs set for the destination, pipeline, or cluster.

import_checkpoint

str

An external checkpoint path to import before starting the flow. Imported only once, when the flow's checkpoint directory does not yet exist.

Examples

Aggregation to a Kafka sink

Write stateful aggregation results to a Kafka sink:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)

Real-time mode

Use spark_conf to configure an update flow for real-time mode:

Python
from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)

Limitations

  • Delta table sinks are not supported as targets for update flows.