Set up real-time mode
This page describes the prerequisites and configuration needed to run real-time mode queries in Structured Streaming. For a step-by-step tutorial, see Tutorial: Run a real-time streaming workload. For conceptual information about real-time mode, see Real-time mode in Structured Streaming.
Prerequisites
To use real-time mode, you must configure your compute to meet the following requirements:
- Use dedicated access mode on classic compute. Standard access mode, Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
- Use Databricks Runtime 16.4 LTS and above.
- Turn off autoscaling.
- Turn off Photon.
- Set
spark.databricks.streaming.realTimeMode.enabledtotrue. - Turn off spot instances to avoid interruptions.
For instructions on creating and configuring classic compute, see Compute configuration reference.
Query configuration
To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Compute sizing
You can run one real-time job per compute resource if the compute has enough task slots.
To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.
Slot calculation examples
Pipeline type | Configuration | Required slots |
|---|---|---|
Single-stage stateless (Kafka source + sink) |
| 8 slots |
Two-stage stateful (Kafka source + shuffle) |
| 28 slots (8 + 20) |
Three-stage (Kafka source + shuffle + repartition) |
| 48 slots (8 + 20 + 20) |
If you don't set maxPartitions, use the number of partitions in the Kafka topic.