Set up real-time mode
This page describes the prerequisites and configuration needed to run real-time mode queries in Structured Streaming. For a step-by-step tutorial, see Tutorial: Run a real-time streaming workload. For conceptual information about real-time mode, see Real-time mode in Structured Streaming.
Prerequisites
To use real-time mode, you must configure your compute to meet the following requirements:
- Use classic compute. Dedicated and standard access modes are supported. Standard access mode is supported for Python only. Lakeflow Spark Declarative Pipelines and serverless clusters are not supported.
- Use Databricks Runtime 16.4 LTS and above.
- Turn off autoscaling.
- Turn off Photon.
- Set
spark.databricks.streaming.realTimeMode.enabledtotrue. - Turn off spot instances to avoid interruptions.
For latency-sensitive workloads with UDFs, Databricks recommends that you use dedicated access mode. See Table functions.
For instructions on creating and configuring classic compute, see Compute configuration reference.
Query configuration
To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.
- Python
- Scala
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Compute sizing
You can run one real-time job per compute resource if the compute has enough task slots.
To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.
Slot calculation examples
Pipeline type | Configuration | Required slots |
|---|---|---|
Single-stage stateless (Kafka source + sink) |
| 8 slots |
Two-stage stateful (Kafka source + shuffle) |
| 28 slots (8 + 20) |
Three-stage (Kafka source + shuffle + repartition) |
| 48 slots (8 + 20 + 20) |
If you don't set maxPartitions, use the number of partitions in the Kafka topic.