Enable state rebalancing for Structured Streaming workloads

You can enable state rebalancing in stateful streaming queries to potentially increase the cluster utilization and reduce the end-to-end micro-batch latency.

Note

Available in Databricks Runtime 11.1 and above.

How does state rebalancing work?

Structured Streaming stateful operators (aggregations, joins) store their state locally on the executor and in cloud storage backup. The state for the operators is split across many tasks (partitions). As loading the state from cloud storage is expensive, the Apache Spark task scheduler prefers to schedule stateful tasks on the executors where they executed in the previous micro batches so that they can take advantage of the executor state cache. This can lead to inefficient cluster utilization when new executors are added to the cluster as the stateful tasks will continue to be scheduled on the old executors while newly added executors stay idle.

The new state rebalancing feature will proactively rebalance the stateful tasks whenever new executors are added to the cluster thus ensuring that these executors are immediately utilized and all stateful tasks are evenly spread across all executors. While this can result in a temporary latency increase for the microbatch in which the stateful tasks were moved to the new executors, over a longer period this should be amortized through the more efficient cluster utilization and lower batch latencies thanks to execution spreading.

Execution with and without state rebalancing

Which workloads benefit from state rebalancing?

State rebalancing will benefit stateful Structured Streaming pipelines that utilize cluster autoscaling or otherwise have changing cluster sizes. Stateless streaming operations will not benefit, regardless of changing cluster sizes.

Enabling state rebalancing

Set the following configuration option in the Spark cluster configuration.

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Monitoring state rebalancing

The state rebalancing feature does not require special monitoring. An indication that it is operating correctly is that spark tasks are executing approximately evenly on all executors on the Spark UI Executors tab.

A cluster resizing event will cause state rebalancing to trigger. You may note that during rebalancing events, a micro-batch may have higher latency as the state loads from cloud storage to the new executor.