Skip to main content

Real-time mode in Structured Streaming

Preview

This feature is in Public Preview.

Real-time mode is a trigger type for Structured Streaming that enables ultra-low latency data processing with end-to-end latency as low as five milliseconds. Use real-time mode for operational workloads that require immediate response to streaming data, such as fraud detection, real-time personalization, and instant decision-making systems.

Real-time mode is available in Databricks Runtime 16.4 LTS and above. For step-by-step setup instructions, see Get started with real-time mode. For code examples, see Real-time mode examples.

What is real-time mode?

Operational vs. analytical workloads

Streaming workloads can be broadly divided into analytical workloads and operational workloads:

  • Analytical workloads use data ingestion and transformation, typically following the medallion architecture (for example, ingesting data into the bronze, silver, and gold tables).
  • Operational workloads consume real-time data, apply business logic, and trigger downstream actions or decisions.

Some examples of operational workloads are:

  • Blocking or flagging a credit card transaction in real time if a fraud score exceeds a threshold, based on factors like unusual location, large transaction size, or rapid spending patterns.
  • Delivering a promotional message when clickstream data shows a user has been browsing for jeans for five minutes, offering a 25% discount if they purchase in the next 15 minutes.

In general, operational workloads are characterized by the need for sub-second end-to-end latency. This can be achieved with real-time mode in Apache Spark Structured Streaming.

How real-time mode achieves low latency

Real-time mode improves the execution architecture by:

  • Executing long-running batches (the default is five minutes), in which the system processes data as it becomes available in the source.
  • Scheduling all stages of the query simultaneously. This requires the number of available task slots to be equal to or greater than the number of tasks of all the stages in a batch.
  • Passing data between stages as soon as it is produced using a streaming shuffle.

At the end of processing a batch, and before the next batch starts, Structured Streaming checkpoints progress and publishes metrics. The batch duration affects checkpointing frequency:

  • Longer batches: Less frequent checkpointing, which means longer replays on failure and delayed metrics availability.
  • Shorter batches: More frequent checkpointing, which may affect latency.

Databricks recommends benchmarking real-time mode against your target workload to find the appropriate trigger interval.

When to use real-time mode

Choose real-time mode when your use case requires:

  • Sub-second latency: Applications that need to respond to data within milliseconds, such as fraud detection systems that must block transactions in real time.
  • Operational decision-making: Systems that trigger immediate actions based on incoming data, like real-time offers, alerts, or notifications.
  • Continuous processing: Workloads where data must be processed as soon as it arrives, rather than in periodic batches.

Use micro-batch mode (the default Structured Streaming trigger) when:

  • Analytical processing: ETL pipelines, data transformations, and medallion architecture implementations where latency requirements are measured in seconds or minutes.
  • Cost optimization: Workloads where sub-second latency is not required, as real-time mode requires dedicated compute resources.
  • The checkpoint frequency matters: Applications that benefit from more frequent checkpointing for faster recovery.

Requirements and configuration

Real-time mode has specific requirements for compute setup and query configuration. This section describes the prerequisites and configuration steps needed to use real-time mode.

Prerequisites

To use real-time mode, you must meet the following requirements:

  • Databricks Runtime 16.4 LTS or above: Real-time mode is only available in DBR 16.4 LTS and later versions.
  • Dedicated compute: You must use a dedicated (formerly single user) compute. Standard (formerly shared), Lakeflow Spark Declarative Pipelines, and serverless clusters are not supported.
  • No autoscaling: Autoscaling must be disabled.
  • No Photon: Photon acceleration is not supported with real-time mode.
  • Spark configuration: You must set spark.databricks.streaming.realTimeMode.enabled to true.

Compute configuration

Configure your compute with the following settings:

  • Set spark.databricks.streaming.realTimeMode.enabled to true in the Spark configuration.
  • Disable autoscaling.
  • Disable Photon acceleration.
  • Ensure the compute is configured as a dedicated cluster (not standard, Lakeflow Spark Declarative Pipelines, or serverless).

For step-by-step instructions on creating and configuring compute for real-time mode, see Get started with real-time mode.

Query configuration

To run a query in real-time mode, you must enable the real-time trigger. Real-time triggers are supported only in update mode.

Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)

Compute sizing

You can run one real-time job per compute resource if the compute has enough task slots.

To run in low-latency mode, the total number of available task slots must be greater than or equal to the number of tasks across all query stages.

Slot calculation examples

Pipeline type

Configuration

Required slots

Single-stage stateless (Kafka source + sink)

maxPartitions = 8

8 slots

Two-stage stateful (Kafka source + shuffle)

maxPartitions = 8, shuffle partitions = 20

28 slots (8 + 20)

Three-stage (Kafka source + shuffle + repartition)

maxPartitions = 8, two shuffle stages of 20 each

48 slots (8 + 20 + 20)

If you don't set maxPartitions, use the number of partitions in the Kafka topic.

Key considerations

When you configure your compute, consider the following:

  • Unlike micro-batch mode, real-time tasks can stay idle while waiting for data, so right-sizing is essential to avoid wasted resources.
  • Aim for a target utilization level (for example, 50%) by tuning:
    • maxPartitions (for Kafka)
    • spark.sql.shuffle.partitions (for shuffle stages)
  • Databricks recommends setting maxPartitions so that each task handles multiple Kafka partitions to reduce overhead.
  • Adjust task slots per worker to match workload for simple one-stage jobs.
  • For shuffle-heavy jobs, experiment to find the minimum number of shuffle partitions that avoid backlogs and adjust from there. The compute won't schedule the job if it doesn't have enough slots.
note

From Databricks Runtime 16.4 LTS and above, all real-time pipelines use checkpoint v2, which allows seamless switching between real-time and micro-batch modes.

Optimization techniques

Technique

Enabled by default

Asynchronous progress tracking: Moves writing to offset log and commit log into an asynchronous thread, reducing the inter-batch time between two micro-batches. This can help reduce the latency of stateless streaming queries.

No

Asynchronous state checkpointing: Helps reduce the latency of stateful streaming queries by beginning to process the next micro-batch as soon as the computation of the previous micro-batch completes, without waiting for state checkpointing.

No

Monitoring and observability

Measuring query performance is essential for real-time workloads. In real-time mode, traditional batch duration metrics don't reflect actual latency, so you need alternative approaches.

End-to-end latency is workload-specific and sometimes can only be accurately measured with business logic. For example, if the source timestamp is output in Kafka, you can calculate latency as the difference between Kafka's output timestamp and the source timestamp.

You can also estimate end-to-end latency using the built-in metrics and APIs described below.

Built-in metrics with StreamingQueryProgress

The following metrics are included in the StreamingQueryProgress event, which is automatically logged in the driver logs. You can also access them through the StreamingQueryListener's onQueryProgress() callback function. QueryProgressEvent.json() or toString() include extra real-time mode metrics.

  1. Processing latency (processingLatencyMs). The time elapsed between when the real-time mode query reads a record and when the query writes it to the next stage or downstream. For single-stage queries, this measures the same duration as E2E latency. The system reports this metric per task.
  2. Source queuing latency (sourceQueuingLatencyMs). The amount of time elapsed between when the system writes a record to a message bus, for example, the log append time in Kafka, and when the real-time mode query first reads the record. The system reports this metric per task.
  3. E2E Latency (e2eLatencyMs). The time between when the system writes the record to a message bus and when the real-time mode query writes the record downstream. The system aggregates this metric per batch across all records processed by all tasks.

For example:

JSON
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},

Custom latency measurement with Observe API

The Observe API helps measure latency without launching another job. If you have a source timestamp that approximates the source data arrival time, you can estimate each batch's latency using the Observe API. Pass the timestamp before reaching the sink:

Python
from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

In this example, a current timestamp is recorded before outputting the entry, and latency is estimated by calculating the difference between this timestamp and the record's source timestamp. The results are included in progress reports and made available to listeners. Here is a sample output:

JSON
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}

Feature support and limitations

This section describes the supported features and current limitations of real-time mode, including compatible environments, languages, sources, sinks, operators, and special considerations for specific features.

Supported environments, languages, and modes

Compute type

Supported

Dedicated (formerly: single user)

Yes

Standard (formerly: shared)

No

Lakeflow Spark Declarative Pipelines Classic

No

Lakeflow Spark Declarative Pipelines Serverless

No

Serverless

No

Supported languages:

Language

Supported

Scala

Yes

Java

Yes

Python

Yes

Supported execution modes:

Execution Mode

Supported

Update mode

Yes

Append mode

No

Complete mode

No

Supported sources and sinks

Sources:

Sources

Supported

Apache Kafka

Yes

AWS MSK

Yes

Event Hubs (using Kafka Connector)

Yes

Kinesis

Yes (only EFO mode)

Google Pub/Sub

No

Apache Pulsar

No

Sinks:

Sinks

Supported

Apache Kafka

Yes

Event Hubs (using Kafka Connector)

Yes

Kinesis

No

Google Pub/Sub

No

Apache Pulsar

No

Arbitrary Sinks (using forEachWriter)

Yes

Supported operators

Operators

Supported

Stateless Operations

Selection

Yes

Projection

Yes

UDFs

Scala UDF

Yes (with some limitations)

Python UDF

Yes (with some limitations)

Aggregation

sum

Yes

count

Yes

max

Yes

min

Yes

avg

Yes

Aggregations functions

Yes

Windowing

Tumbling

Yes

Sliding

Yes

Session

No

Deduplication

dropDuplicates

Yes (the state is unbounded)

dropDuplicatesWithinWatermark

No

Stream - Table Join

Broadcast table (should be small)

Yes

Stream - Stream Join

No

(flat)MapGroupsWithState

No

transformWithState

Yes (with some differences)

union

Yes (with some limitations)

forEach

Yes

forEachBatch

No

mapPartitions

No (see limitation)

Special considerations

Some operators and features have specific considerations or differences when used in real-time mode.

transformWithState in real-time mode

For building custom stateful applications, Databricks supports transformWithState, an API in Apache Spark Structured Streaming. See Build a custom stateful application for more information about the API and code snippets.

However, there are some differences between how the API behaves in real-time mode and traditional streaming queries that leverage the micro-batch architecture.

  • Real-time mode calls the handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) method for each row.
    • The inputRows iterator returns a single value. Micro-batch mode calls it once for each key, and the inputRows iterator returns all values for a key in the micro batch.
    • You must be aware of this difference when writing your code.
  • Event time timers are not supported in real-time mode.
  • In real-time mode, timers are delayed in firing depending on data arrival:
    • If a timer is scheduled for 10:00:00 but no data arrives, the timer doesn't fire immediately.
    • If data arrives at 10:00:10, the timer fires with a 10-second delay.
    • If no data arrives and the long-running batch is terminating, the timer fires before the batch terminates.

Python UDFs in real-time mode

Databricks supports the majority of Python user-defined functions (UDFs) in real-time mode:

UDF type

Supported

Stateless UDF

Python scalar UDF (link)

Yes

Arrow scalar UDF

Yes

Pandas scalar UDF (link)

Yes

Arrow function (mapInArrow)

Yes

Pandas function (link)

Yes

Stateful Grouping UDF (UDAF)

transformWithState (only Row interface)

Yes

applyInPandasWithState

No

Non-stateful Grouping UDF (UDAF)

apply

No

applyInArrow

No

applyInPandas

No

Table function

UDTF (link)

No

UC UDF

No

There are several points to consider when using Python UDFs in real-time mode:

  • To minimize the latency, configure the Arrow batch size (spark.sql.execution.arrow.maxRecordsPerBatch) to 1.
    • Trade-off: This configuration optimizes for latency at the expense of throughput. For most workloads, this setting is recommended.
    • Increase the batch size only if a higher throughput is required to accommodate input volume, accepting the potential increase in latency.
  • Pandas UDFs and functions do not perform well with an Arrow batch size of 1.
    • If you use pandas UDFs or functions, set the Arrow batch size to a higher value (for example, 100 or higher).
    • Note that this implies higher latency. Databricks recommends using Arrow UDF or function if possible.
  • Due to the performance issue with pandas, transformWithState is only supported with the Row interface.

Limitations

Source limitations

For Kinesis, real-time mode doesn't support polling mode. Moreover, frequent repartitions might negatively impact latency.

Union limitations

The Union operator has some limitations:

  • Real-time mode doesn't support self-union:
    • Kafka: You can't use the same source data frame object and union derived data frames from it. Workaround: Use different DataFrames that read from the same source.
    • Kinesis: You can't union data frames derived from the same Kinesis source with the same configuration. Workaround: Besides using different DataFrames, you can assign a different 'consumerName' option to each DataFrame.
  • Real-time mode doesn't support stateful operators (for example, aggregate, deduplicate, transformWithState) defined before the Union.
  • Real-time mode doesn't support union with batch sources.

MapPartitions limitation

mapPartitions in Scala and similar Python APIs (mapInPandas, mapInArrow) take an iterator of the entire input partition and produce an iterator of the entire output with arbitrary mapping between input and output. These APIs can cause performance issues in Streaming Real-Time Mode by blocking the entire output, which increases latency. The semantics of these APIs don't support watermark propagation well.

Use scalar UDFs combined with Transform complex data types or filter instead to achieve similar functionality.

Next steps

Now that you understand what real-time mode is and how to configure it, explore these resources to start implementing real-time streaming applications: