Skip to main content

Real-time mode in Structured Streaming

Preview

This feature is in Public Preview.

This page describes real-time mode, a trigger type for Structured Streaming that enables ultra-low latency data processing with end-to-end latency as low as 5 ms. This mode is designed for operational workloads that require immediate response to streaming data.

Real-time mode is available in Databricks Runtime 16.4 LTS and above.

Operational workloads

Streaming workloads can be broadly divided into analytical workloads and operational workloads:

  • Analytical workloads use data ingestion and transformation, typically following the medallion architecture (for example, ingesting data into the bronze, silver, and gold tables).
  • Operational workloads consume real-time data, apply business logic, and trigger downstream actions or decisions.

Some examples of operational workloads are:

  • Blocking or flagging a credit card transaction in real time if a fraud score exceeds a threshold, based on factors like unusual location, large transaction size, or rapid spending patterns.
  • Delivering a promotional message when clickstream data shows a user has been browsing for jeans for five minutes, offering a 25% discount if they purchase in the next 15 minutes.

In general, the operational workloads are characterized by the need for sub-second end-to-end latency. This can be achieved with real-time mode in Apache Spark Structured Streaming.

How real-time mode achieves low latency

Real-time mode improves the execution architecture by:

  • Executing long-running batches (the default is 5 minutes), in which data is processed as it becomes available in the source.
  • All the stages of the query are scheduled simultaneously. This requires the number of available task slots to be equal to or greater than the number of tasks of all the stages in a batch.
  • The data is passed between stages as soon as it is produced using a streaming shuffle.

At the end of processing a batch, and before the next batch starts, Structured Streaming checkpoints progress and makes metrics for the last batch available. If the batches are longer, these activities might be less frequent, leading to longer replays in the case of failure and delay in the availability of metrics. On the other hand, if the batches are smaller, these activities become more frequent, potentially affecting latency. Databricks recommends you benchmark real-time mode against your target workload and requirements to find the appropriate trigger interval.

Cluster configuration

To use real-time mode in Structured Streaming, you must configure a classic Lakeflow Job:

  1. In your Databricks workspace, click New in the upper-left corner. Choose More and click Cluster.

  2. Clear Photon acceleration.

  3. Clear Enable autoscaling.

  4. Under Advanced performance, clear Use spot instances.

  5. Under Advanced and Access mode, click Manual and select Dedicated (formerly: Single user).

  6. Under Spark, enter the following under Spark config:

    spark.databricks.streaming.realTimeMode.enabled true
  7. Click Create.

Cluster size requirements

You can run one real-time job per cluster if the cluster has enough task slots.

To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.

Slot calculation examples

Single-stage stateless pipeline (Kafka source + sink):

If maxPartitions = 8, you need at least 8 slots. If maxPartitions isn’t set, use the number of Kafka topic partitions.

Two-stage stateful pipeline (Kafka source + shuffle):

If maxPartitions = 8 and shuffle partitions = 20, you need 8 + 20 = 28 slots.

Three-stage pipeline (Kafka source + shuffle + repartition):

With maxPartitions = 8 and two shuffle stages of 20 each, you need 8 + 20 + 20 = 48 slots.

Key considerations

When you configure your cluster, take this into consideration:

  • Unlike micro-batch mode, real-time tasks can stay idle while waiting for data, so right-sizing is essential to avoid wasted resources.
  • Aim for a target utilization level (e.g., 50%) by tuning:
    • maxPartitions (for Kafka)
    • spark.sql.shuffle.partitions (for shuffle stages)
  • Databricks recommends setting maxPartitions so that each task handles multiple Kafka partitions to reduce overhead.
  • Adjust task slots per worker to match workload for simple one-stage jobs.
  • For shuffle-heavy jobs, experiment to find the minimum number of shuffle partitions that avoid backlogs and adjust from there. The job won't be scheduled if the cluster doesn’t have enough slots.
note

From Databricks Runtime 16.4 LTS and above, all real-time pipelines use checkpoint v2, which allows seamless switching between real-time and micro-batch modes.

Query configuration

You must enable the real-time trigger to specify that a query should run using the low-latency mode. Furthermore, real-time triggers are supported only in update mode. For example:

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode(“update”)
.trigger(RealTimeTrigger.apply())
.start()

RealTimeTrigger can also accept an argument specifying the checkpoint interval. For example, this code indicates a checkpoint interval of 5 minutes:

Scala
.trigger(RealTimeTrigger.apply("5 minutes"))

Observability

Previously, end-to-end query latency was closely tied to batch duration, making batch duration a good indicator of query latency. However, this method no longer applies in real-time mode, requiring alternative approaches for measuring latency. End-to-end latency is workload-specific and sometimes can only be accurately measured with business logic. For example, if the source timestamp is outputted in Kafka, the latency can be calculated as the difference between Kafka's output timestamp and the source timestamp.

You can estimate end-to-end latency in several ways based on partial information gathered during the streaming process.

Use StreamingQueryProgress

The following metrics are included in the StreamingQueryProgress event, which is automatically logged in the driver logs. You can also access them through the StreamingQueryListener's onQueryProgress() callback function. QueryProgressEvent.json() or toString() include extra real-time mode metrics.

  1. Processing latency (processingLatencyMs). The time elapsed between when the real-time mode query reads a record and before it is written to the next stage or downstream. For single-stage queries, this measures the same duration as E2E latency. This metric is reported per task.
  2. Source queuing latency (sourceQueuingLatencyMs). The amount of time transpired between when a record is successfully written to a message bus, for example, the log append time in Kafka, and when the record was first read by the real-time mode query. This metric is reported per task.
  3. E2E Latency (e2eLatencyMs). The time between when the record is successfully written to a message bus and when the record is written downstream by the real-time mode query. This metric is aggregated per batch across all records processed by all tasks.

For example:

"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},

Use Observe API in jobs

The Observe API helps measure latency without launching another job. If you have a source timestamp that approximates the source data arrival time and it is passed before reaching the sink, or if you can find a way to pass the timestamp, you can estimate each batch's latency using the Observe API:

Scala
val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In this example, a current timestamp is recorded before outputting the entry, and latency is estimated by calculating the difference between this timestamp and the record's source timestamp. The results are included in progress reports and made available to listeners. Here is a sample output:

"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}

What is supported?

Environments

Cluster Type

Supported

Dedicated (formerly: single user)

Yes

Standard (formerly: shared)

No

Lakeflow Declarative Pipelines Classic

No

Lakeflow Declarative Pipelines Serverless

No

Serverless

No

Languages

Language

Supported

Scala

Yes

Java

Yes

Python

No

Execution Modes

Execution Mode

Supported

Update mode

Yes

Append mode

No

Complete mode

No

Sources

Sources

Supported

Apache Kafka

Yes

AWS MSK

Yes

Eventhub (using Kafka Connector)

Yes

Kinesis

Yes (only EFO mode)

Google Pub/Sub

No

Apache Pulsar

No

Sinks

Sinks

Supported

Apache Kafka

Yes

Eventhub (using Kafka Connector)

Yes

Kinesis

No

Google Pub/Sub

No

Apache Pulsar

No

Arbitrary Sinks (using forEachWriter)

Yes

Operators

Operators

Supported

Stateless Operations

  • Selection

Yes

  • Projection

Yes

UDFs

  • Scala UDF

Yes

  • Python UDF

No

Aggregation

  • sum

Yes

  • count

Yes

  • max

Yes

  • min

Yes

  • avg

Yes

Aggregations functions

Yes

Windowing

  • Tumbling

Yes

  • Sliding

Yes

  • Session

No

Deduplication

  • dropDuplicates

Yes (the state is unbounded)

  • dropDuplicatesWithinWatermark

No

Stream - Table Join

  • Broadcast table (should be small)

Yes

Stream - Stream Join

No

(flat)MapGroupsWithState

No

transformWithState

Yes (with some differences)

union

Yes (with some limitations)

forEach

Yes

forEachBatch

No

mapPartitions

Yes

Use transformWithState in real-time mode

For building custom stateful applications, Databricks supports transformWithState, an API in Apache Spark Structured Streaming. See Build a custom stateful application for more information about the API and code snippets.

However, there are some differences between how the API behaves in real-time mode and traditional streaming queries that leverage the micro-batch architecture.

  • The method in real-time mode handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) is called for each row.
    • The inputRows iterator returns a single value. In micro batch mode, it is called once for each key, and the inputRows iterator returns all values for a key in the micro batch.
    • You must be cognizant of this difference when writing their code.
  • Event time timers are not supported in real-time mode.
  • In real-time mode, timers are delayed in firing depending on the data arrival. Otherwise, if there is no data, it is fired at the end of the long-running batch. For example, if a timer is supposed to fire at 10:00:00 and there is no data arrival simultaneously, it is not fired. Instead, if data arrives at 10:00:10, the timer is fired with a delay of 10 seconds. Or, if no data arrives and the long-running batch is being terminated, it runs the timer before terminating the long-running batch.

Limitations

Source limitation

For Kinesis, polling mode is not supported. Moreover, frequent repartitions might negatively impact latency.

Union limitation

For Union, there are some limitations:

  • Self-union is not supported:
    • Kafka: You cannot use the same source data frame object and union derived data frames from it. Workaround: Use different Dataframes that read from the same source.
    • Kinesis: You cannot union data frames derived from the same Kinesis source with the same configuration. Workaround: Besides using different Dataframes, you can assign a different ‘consumerName’ option to each DataFrame.
  • Stateful operators (for example, aggregate, deduplicate, transformWithState) defined before the Union are not supported.
  • Union with batch sources is not supported.

Examples

The examples below show queries that are supported.

Stateless Queries

Any single or multiple-stage stateless queries are supported.

Kafka Source to Kafka Sink

In this example, you read from a Kafka source and write to a Kafka sink.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Repartition

In this example, you read from a Kafka source, repartition the data into 20 partitions, and write to a Kafka sink.

Due to a current implementation limitation, set the Spark configuration spark.sql.execution.sortBeforeRepartition to false before using the repartition.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Stream-snapshot join (broadcast only)

In this example, you read from Kafka, join the data with a static table, and write to a Kafka sink. Note that only stream-static joins that broadcast the static table are supported, which means the static table should fit in memory.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Kinesis source to Kafka sink

In this example, you read from a Kinesis source and write to a Kafka sink.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
.format("kinesis")
.option(REGION_KEY, regionName)
.option(AWS_ACCESS_ID_KEY, awsAccessKeyId)
.option(AWS_SECRET_KEY, awsSecretAccessKey)
.option(CONSUMER_MODE_KEY, CONSUMER_MODE_EFO)
.option(CONSUMER_NAME_KEY, kinesisSourceStream.consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Union

In this example, you union two Kafka DataFrames from two different topics and write to a Kafka sink.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()

val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()

df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Stateful queries

Deduplication

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 40)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

Aggregation

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()

Union with Aggregation

In this example, you first union two Kafka DataFrames from two different topics and then do an aggregation. In the end, you write to the Kafka sink.

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic1)
.load()

val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic2)
.load()

df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()

TransformWithState

Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/

class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _

override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}

override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2

val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)

(key, oldValue + 1, sourceTimestamp)
}
}
}

spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, 20)
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", Files.createTempDirectory("some-prefix").toFile.getName)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
note

There is a difference between how real-time mode and other execution modes in Structured Streaming execute the StatefulProcessor in transformWithState. See Use transformWithState in real-time mode

Sinks

Writing to Postgres via foreachSink

Scala
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable

/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin

private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0

private var connection: Connection = _

/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)

if (bufferSize == 0) {
return
}

var upsertStatement: PreparedStatement = null

try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}

upsertStatement.executeBatch()
connection.commit()

bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}

override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}

override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}

override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}


spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()

Display

important

This feature is available in Databricks Runtime 17.1 and above.

Display rate source

In this example, you read from a rate source and display the streaming DataFrame in a notebook.

Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime("30 seconds"), outputMode=OutputMode.Update())