Skip to main content

Use real-time mode in Lakeflow Spark Declarative Pipelines

Public Preview

Real-time mode in Lakeflow Spark Declarative Pipelines is in Public Preview on Databricks Runtime 18.1.2 on the preview channel.

Real-time mode enables ultra-low-latency data processing, with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection and real-time personalization.

Real-time mode is also available directly in Structured Streaming outside of pipelines. See Real-time mode in Structured Streaming.

How real-time mode achieves low latency

Real-time mode differs from standard continuous processing in three key ways:

  • Long-running batches: The system processes data as it becomes available in the source within long-running batches (default is five minutes).
  • Simultaneous stage scheduling: All query stages are scheduled at the same time. The compute resource must have enough available task slots to cover all stages concurrently. See Compute sizing.
  • Streaming shuffle: Data is passed between stages as soon as it is produced, rather than waiting for an upstream stage to complete before starting the downstream stage.

The checkpoint interval (configured via pipelines.trigger.interval) controls how frequently state and source offsets are persisted to durable storage. Longer intervals reduce checkpointing overhead but increase recovery time after a failure and delay metrics reporting. Shorter intervals improve durability but add overhead.

Real-time mode and continuous pipelines

Real-time mode is a specialized type of continuous trigger. Continuous mode is still required — real-time mode adds flow-level latency optimizations on top. To use real-time mode, the pipeline must first run in continuous mode. Real-time mode then applies additional optimizations at the flow level to achieve sub-second latency beyond what standard continuous processing provides.

Enabling real-time mode requires three configuration steps:

  1. Set the pipeline to continuous mode.
  2. Enable real-time mode at the pipeline level.
  3. Define a real-time update flow.

Requirements

Requirement

Value

Databricks Runtime

18.1.2 on the SDP preview channel

Compute type

Classic compute or serverless

Configure real-time mode

Step 1: Set the pipeline to continuous mode

In your pipeline settings, set Pipeline mode to Continuous, or set it in the pipeline JSON:

JSON
{
"continuous": true
}

Step 2: Enable real-time mode at the pipeline level

In your pipeline settings, add the following key to the Spark configuration under Advanced > Spark config:

ini
spark.databricks.streaming.realTimeMode.enabled = true

You can also set this in the pipeline JSON:

JSON
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}

Step 3: Define a real-time update flow

Real-time mode requires an update flow. Use dp.create_sink() to define the output target, then use the @dp.update_flow decorator with pipelines.trigger set to "RealTime" and target pointing to the sink.

Python
from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)

Flow-level configuration parameters:

Parameter

Required

Default

Description

pipelines.trigger

Yes

Set to "RealTime" to enable real-time mode for this flow.

pipelines.trigger.interval

No

"5 minutes"

Checkpoint interval. Controls how often state and offsets are committed. Shorter values improve recoverability; longer values reduce overhead.

Code examples

Kafka to Kafka

Read from a Kafka topic and write to a Kafka output target:

Python
from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)

Enrich with a broadcast join

Join a Kafka stream against a static lookup table. Only broadcast (stream-to-static) joins are supported. Stream-to-stream joins are not supported in real-time mode.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})

@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)

Aggregation

Count events by key using a stateful groupBy. Set spark.sql.shuffle.partitions to match the input partition count for stateful operations:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})

@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)

Supported sources and sinks

Connector

As source

As sink

Notes

Apache Kafka

AWS MSK

Uses the Kafka-compatible interface.

Azure Event Hubs (Kafka connector)

Uses the Kafka-compatible interface.

Amazon Kinesis

Not supported

Use for EFO (Enhanced Fan-Out) mode only.

Delta

Not supported

Not supported

Compute sizing

You can run one real-time pipeline per compute resource if the compute has enough task slots. Available task slots must cover all tasks across all query stages.

Pipeline type

Configuration

Required task slots

Single-stage stateless (Kafka source + sink)

maxPartitions = 8

8

Two-stage stateful (Kafka source + shuffle)

maxPartitions = 8, shuffle partitions = 20

28 (8 + 20)

Three-stage (Kafka source + two shuffles)

maxPartitions = 8, two shuffle stages of 20 each

48 (8 + 20 + 20)

If you don't set maxPartitions, use the number of partitions in the Kafka topic.

Operator support

Category

Operator

Supported

Stateless

Selection, Projection

UDFs

Scala UDF

✓ (with limitations)

UDFs

Python UDF

✓ (with limitations)

Aggregation

sum, count, max, min, avg

Windowing

Tumbling, Sliding

Windowing

Session

Not supported

Deduplication

dropDuplicates

✓ (unbounded state)

Deduplication

dropDuplicatesWithinWatermark

Not supported

Joins

Broadcast table join

Joins

Stream-to-stream join

Not supported

Custom

transformWithState

✓ (with behavioral differences)

Custom

union

✓ (with limitations)

Custom

forEach

Not supported

Custom

flatMapGroupsWithState

Not supported

Custom

mapPartitions

Not supported

Custom

forEachBatch

Not supported

transformWithState in real-time mode

transformWithState is supported in real-time mode with the following differences from micro-batch processing:

  • handleInputRows is invoked once per row rather than once per key per batch. The inputRows iterator yields a single value per invocation.
  • Event-time timers are not supported. Processing-time timers fire when a long-running batch terminates if no data has arrived.
  • transformWithStateInPandas is not supported.

Pandas UDFs in real-time mode

To minimize latency with pandas UDFs, set spark.sql.execution.arrow.maxRecordsPerBatch to 1. This optimizes for latency at the expense of throughput. If throughput is also important, set this value to 100 or higher.

Monitor real-time mode performance

Real-time mode exposes latency metrics in StreamingQueryProgress under the latencies field. Access these metrics via a StreamingQueryListener or by inspecting the lastProgress property on the streaming query.

Metric

Description

processingLatencyMs

Time between when a record is read by the flow and when it is fully processed by the flow

sourceQueuingLatencyMs

Time between when a record is successfully written to the message bus (for example, log append time in Kafka) and when it is first read by the flow

e2eLatencyMs

Total end-to-end latency from when the record is produced at the source to when it is fully processed by the flow

Each metric is reported as p50, p90, p95, and p99 percentiles.

Limitations

One real-time flow per pipeline is recommended. Multiple flows are allowed, but task slot contention across flows increases latency.

For a complete list of operator and source limitations, see Real-time mode limitations.

Additional resources