Skip to main content

Run multiple Structured Streaming queries on the same cluster

Many customers run multiple Structured Streaming queries on the same Databricks cluster. Although this pattern is supported, Databricks recommends limiting the number of queries per cluster to avoid scaling issues and performance bottlenecks. On serverless compute, Databricks manages scaling automatically, so these considerations are handled for you. If you are using classic compute, where you control driver and executor sizing, this page describes the key bottlenecks to keep in mind and ways to address them.

note

Databricks recommends using Lakeflow Spark Declarative Pipelines for new streaming workloads, which manages infrastructure complexity automatically. See Lakeflow Spark Declarative Pipelines.

When to use multiple queries on the same cluster

Running multiple streaming queries on the same cluster reduces infrastructure costs, especially when you have many small streams that don't each require dedicated compute. The key tradeoff is shared failure: if the cluster fails, every stream on it fails. For mission-critical pipelines, that shared failure mode is often unacceptable.

For workloads that mix critical and non-critical streams, Databricks recommends the following:

  • Assign each stream a priority based on its business impact.
  • Place mission-critical streams on dedicated clusters, even at higher cost.
  • Co-locate lower-priority streams to share compute and reduce cost.

Driver sizing

The driver is a shared resource. Multiple queries share the same CPU, memory, DAG scheduler, task scheduler, and driver-side UDF execution (for example, foreachBatch). When running many concurrent streams, watch for these specific bottlenecks beyond standard CPU and memory provisioning:

  • Auto Loader overhead: If your streams use Auto Loader, file discovery and directory listing increase driver pressure.
  • OS-level resource limits (open files): Running a high volume of file-based streams (such as FileStreamSource or Auto Loader) simultaneously on a single driver can exhaust user-level open file descriptor limits, which can cause random stream failures.
  • Listener bus backpressure: A high number of concurrent streaming queries can cause backpressure on the single Spark session's StreamingQueryListener bus. All events (including onQueryIdle) are sent to this single bus, and a large event backlog can severely delay asynchronous onQueryProgress handlers and affect cluster stability.
  • Expensive driver operations: Avoid calling collect() or other expensive DataFrame operations on the driver unless absolutely necessary, to avoid materializing large result sets and causing out-of-memory (OOM) errors.

Troubleshoot driver contention

If you are experiencing driver crashes due to OOM or contention issues:

  1. Monitor driver metrics in the Spark UI. If you see high CPU, memory, or disk usage, adjust driver sizing in the cluster compute settings.
  2. If issues persist, verify that your code is not running memory-intensive operations or UDFs on the driver.
  3. If you cannot scale the driver vertically any further, Databricks strongly recommends splitting your jobs across multiple clusters to bypass these shared-node scaling bottlenecks.

Executor sizing

With multiple queries running on the same cluster, all queries share task slots on the executors. Stages from one query can occupy available slots, leading to delays and starvation for other queries. Spark uses a 1:1 mapping between task slots and available cores. Make sure that enough cores are available if queries need to run concurrently.

In general, executors might perform more memory-intensive operations than the driver node. Tune the executor JVM and off-heap memory allocation parameters if needed to handle your application load. Ensure that executor nodes are sized appropriately in terms of CPU, memory, and disk space and scale vertically if needed. If vertical scaling is not possible, consider adding additional worker nodes to the cluster.

note

Some of these changes might require restarting the cluster to take effect.

Use scheduler pools

You can configure scheduler pools to assign compute capacity to queries when running multiple streaming queries from the same source code.

By default, all queries started in a notebook run in the same fair scheduling pool. Apache Spark jobs generated by triggers from all of the streaming queries in a notebook run one after another in "first in, first out" (FIFO) order. This can cause unnecessary delays in the queries, because they aren't efficiently sharing the cluster resources.

Scheduler pools allow you to declare which Structured Streaming queries share compute resources.

The following example assigns query1 to a dedicated pool, whereas query2 and query3 share a scheduler pool.

Python
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
note

The local property configuration must be in the same notebook cell where you start your streaming query.

For more information about fair scheduler pools, see the Apache Spark fair scheduler documentation.

Stateful query considerations

For stateful queries running on the same cluster, keep the following in mind:

  • Use RocksDB as the state store provider to avoid OOM issues and GC pauses. RocksDB is the default state store provider in Databricks Runtime 17.3 and above. See Configure RocksDB state store on Databricks.
  • Tune shuffle partitions for your application requirements. For stateful stages, Spark schedules tasks proportional to the number of shuffle partitions.
  • Cap RocksDB memory usage on a per-node basis to avoid OOM errors from off-heap memory usage. This is handled automatically in Databricks Runtime 17.3 and above, but requires manual configuration on earlier releases. See Cap RocksDB memory usage.
  • Avoid packing too many partitions on the same executor node. Maintenance operations on the state store, including snapshot upload and cleanup, run on a per-node basis. Assigning too many partitions to one executor node can cause maintenance starvation and longer recovery times due to fewer full snapshots being available.