Use real-time mode in Lakeflow Spark Declarative Pipelines
Real-time mode in Lakeflow Spark Declarative Pipelines is in Public Preview on Databricks Runtime 18.1.2 on the preview channel.
Real-time mode enables ultra-low-latency data processing, with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection and real-time personalization.
Real-time mode is also available directly in Structured Streaming outside of pipelines. See Real-time mode in Structured Streaming.
How real-time mode achieves low latency
Real-time mode differs from standard continuous processing in three key ways:
- Long-running batches: The system processes data as it becomes available in the source within long-running batches (default is five minutes).
- Simultaneous stage scheduling: All query stages are scheduled at the same time. The compute resource must have enough available task slots to cover all stages concurrently. See Compute sizing.
- Streaming shuffle: Data is passed between stages as soon as it is produced, rather than waiting for an upstream stage to complete before starting the downstream stage.
The checkpoint interval (configured via pipelines.trigger.interval) controls how frequently state and source offsets are persisted to durable storage. Longer intervals reduce checkpointing overhead but increase recovery time after a failure and delay metrics reporting. Shorter intervals improve durability but add overhead.
Real-time mode and continuous pipelines
Real-time mode is a specialized type of continuous trigger. Continuous mode is still required — real-time mode adds flow-level latency optimizations on top. To use real-time mode, the pipeline must first run in continuous mode. Real-time mode then applies additional optimizations at the flow level to achieve sub-second latency beyond what standard continuous processing provides.
Enabling real-time mode requires three configuration steps:
- Set the pipeline to continuous mode.
- Enable real-time mode at the pipeline level.
- Define a real-time update flow.
Requirements
Requirement | Value |
|---|---|
Databricks Runtime | 18.1.2 on the SDP preview channel |
Compute type | Classic compute or serverless |
Configure real-time mode
Step 1: Set the pipeline to continuous mode
In your pipeline settings, set Pipeline mode to Continuous, or set it in the pipeline JSON:
{
"continuous": true
}
Step 2: Enable real-time mode at the pipeline level
In your pipeline settings, add the following key to the Spark configuration under Advanced > Spark config:
spark.databricks.streaming.realTimeMode.enabled = true
You can also set this in the pipeline JSON:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
Step 3: Define a real-time update flow
Real-time mode requires an update flow. Use dp.create_sink() to define the output target, then use the @dp.update_flow decorator with pipelines.trigger set to "RealTime" and target pointing to the sink.
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
Flow-level configuration parameters:
Parameter | Required | Default | Description |
|---|---|---|---|
| Yes | — | Set to |
| No |
| Checkpoint interval. Controls how often state and offsets are committed. Shorter values improve recoverability; longer values reduce overhead. |
Code examples
Kafka to Kafka
Read from a Kafka topic and write to a Kafka output target:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
Enrich with a broadcast join
Join a Kafka stream against a static lookup table. Only broadcast (stream-to-static) joins are supported. Stream-to-stream joins are not supported in real-time mode.
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Aggregation
Count events by key using a stateful groupBy. Set spark.sql.shuffle.partitions to match the input partition count for stateful operations:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
Supported sources and sinks
Connector | As source | As sink | Notes |
|---|---|---|---|
Apache Kafka | ✓ | ✓ | — |
AWS MSK | ✓ | ✓ | Uses the Kafka-compatible interface. |
Azure Event Hubs (Kafka connector) | ✓ | ✓ | Uses the Kafka-compatible interface. |
Amazon Kinesis | ✓ | Not supported | Use for EFO (Enhanced Fan-Out) mode only. |
Delta | Not supported | Not supported | — |
Compute sizing
You can run one real-time pipeline per compute resource if the compute has enough task slots. Available task slots must cover all tasks across all query stages.
Pipeline type | Configuration | Required task slots |
|---|---|---|
Single-stage stateless (Kafka source + sink) |
| 8 |
Two-stage stateful (Kafka source + shuffle) |
| 28 (8 + 20) |
Three-stage (Kafka source + two shuffles) |
| 48 (8 + 20 + 20) |
If you don't set maxPartitions, use the number of partitions in the Kafka topic.
Operator support
Category | Operator | Supported |
|---|---|---|
Stateless | Selection, Projection | ✓ |
UDFs | Scala UDF | ✓ (with limitations) |
UDFs | Python UDF | ✓ (with limitations) |
Aggregation | sum, count, max, min, avg | ✓ |
Windowing | Tumbling, Sliding | ✓ |
Windowing | Session | Not supported |
Deduplication |
| ✓ (unbounded state) |
Deduplication |
| Not supported |
Joins | Broadcast table join | ✓ |
Joins | Stream-to-stream join | Not supported |
Custom |
| ✓ (with behavioral differences) |
Custom |
| ✓ (with limitations) |
Custom |
| Not supported |
Custom |
| Not supported |
Custom |
| Not supported |
Custom |
| Not supported |
transformWithState in real-time mode
transformWithState is supported in real-time mode with the following differences from micro-batch processing:
handleInputRowsis invoked once per row rather than once per key per batch. TheinputRowsiterator yields a single value per invocation.- Event-time timers are not supported. Processing-time timers fire when a long-running batch terminates if no data has arrived.
transformWithStateInPandasis not supported.
Pandas UDFs in real-time mode
To minimize latency with pandas UDFs, set spark.sql.execution.arrow.maxRecordsPerBatch to 1. This optimizes for latency at the expense of throughput. If throughput is also important, set this value to 100 or higher.
Monitor real-time mode performance
Real-time mode exposes latency metrics in StreamingQueryProgress under the latencies field. Access these metrics via a StreamingQueryListener or by inspecting the lastProgress property on the streaming query.
Metric | Description |
|---|---|
| Time between when a record is read by the flow and when it is fully processed by the flow |
| Time between when a record is successfully written to the message bus (for example, log append time in Kafka) and when it is first read by the flow |
| Total end-to-end latency from when the record is produced at the source to when it is fully processed by the flow |
Each metric is reported as p50, p90, p95, and p99 percentiles.
Limitations
One real-time flow per pipeline is recommended. Multiple flows are allowed, but task slot contention across flows increases latency.
For a complete list of operator and source limitations, see Real-time mode limitations.