Optimize and monitor real-time mode query performance
This page covers compute tuning, techniques for reducing end-to-end latency, and approaches for measuring query performance in real-time mode.
Compute tuning
When you configure your compute, consider the following:
- Unlike micro-batch mode, real-time tasks can stay idle while waiting for data, so right-sizing is essential to avoid wasted resources.
- Aim for a target cluster utilization level, such as 50%, by tuning:
maxPartitions(for Kafka)spark.sql.shuffle.partitions(for shuffle stages)
- Databricks recommends setting
maxPartitionsso that each task handles multiple Kafka partitions to reduce overhead. - Adjust task slots per worker to match workload for simple one-stage jobs.
- For shuffle-heavy jobs, experiment to find the minimum number of shuffle partitions that avoid backlogs and adjust from there. The compute won't schedule the job if it doesn't have enough slots.
From Databricks Runtime 16.4 LTS and above, all real-time pipelines use checkpoint v2 to allow seamless switches between real-time and micro-batch modes.
Latency optimization
Structured Streaming real-time mode has optional techniques for reducing end-to-end latency. Neither is enabled by default. You must enable them separately.
- Asynchronous progress tracking: Moves writes to the offset and commit logs into an asynchronous thread, reducing inter-batch time for stateless queries.
- Asynchronous state checkpointing: Begins processing the next micro-batch as soon as computation completes, without waiting for state checkpointing, reducing latency for stateful queries.
Monitoring and observability
In real-time mode, traditional batch duration metrics don't reflect actual end-to-end latency. Use the approaches below to measure latency accurately and identify bottlenecks in your queries.
End-to-end latency is workload-specific and sometimes can only be accurately measured with business logic. For example, if the source timestamp is output in Kafka, you can calculate latency as the difference between Kafka's output timestamp and the source timestamp.
Built-in metrics with StreamingQueryProgress
The StreamingQueryProgress event is automatically logged in the driver logs and accessible through the StreamingQueryListener's onQueryProgress() callback function. This enables you to react to progress events programmatically, for example, if you want to publish metrics to an external monitoring system. QueryProgressEvent.json() or toString() include these real-time mode metrics:
- Processing latency (
processingLatencyMs). The time elapsed between when the real-time mode query reads a record and when the query writes it to the next stage or downstream. For single-stage queries, this measures the same duration as end-to-end latency. The system reports this metric per task. - Source queuing latency (
sourceQueuingLatencyMs). The amount of time elapsed between when the system writes a record to a message bus—for example, the log append time in Kafka—and when the real-time mode query first reads the record. The system reports this metric per task. - End-to-end latency (
e2eLatencyMs). The time between when the system writes the record to a message bus and when the real-time mode query writes the record downstream. The system aggregates this metric per batch across all records processed by all tasks.
For example:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
}
}
Custom latency measurement with the Observe API
The Observe API enables you to measure latency inline without launching a separate job. If you have a source timestamp that approximates source data arrival time, you can estimate per-batch latency by recording a timestamp before the sink and computing the difference. The results appear in progress reports and are available to listeners.
- Python
- Scala
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
Sample output:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}