Optimize stateful Structured Streaming queries
Managing the intermediate state information of stateful Structured Streaming queries can help prevent unexpected latency and production problems.
Work with multiple stateful operators in Structured Streaming
In Databricks Runtime 13.1 and above, Databricks offers advanced support for stateful operators in Structured Streaming workloads. You can now chain multiple stateful operators together, meaning that you can feed the output of an operation such as a windowed aggregation to another stateful operation such as a join.
The following examples demonstrate several patterns you can use.
Important
The following limitations exist when working with multiple stateful operators:
FlatMapGroupWithState
is not supported.Only the append output mode is supported.
Chained time window aggregation
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Time window aggregation in two different streams followed by stream-stream window join
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Stream-stream time interval join followed by time window aggregation
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Preventing slow down from garbage collection (GC) pause in stateful streaming
If you have stateful operations in your streaming query (such as streaming aggregation) and you want to maintain millions of keys in the state, then you may face issues related to large JVM garbage collection (GC) pauses. This causes high variations in the micro-batch processing times. This occurs because your JVM’s memory maintains your state data by default. Having a large number of state objects puts pressure on your JVM memory, which causes high GC pauses.
In such cases, you can choose to use a more optimized state management solution based on RocksDB. This solution is available in Databricks Runtime. Rather than keeping the state in the JVM memory, this solution uses RocksDB to efficiently manage the state in the native memory and the local SSD. Furthermore, any changes to this state are automatically saved by Structured Streaming to the checkpoint location you have provided, thus providing full fault-tolerance guarantees (the same as default state management). For instructions for configuring RocksDB as state store, see Configure RocksDB state store on Databricks.
Recommended configurations for stateful Structured Streaming on Databricks
Databricks recommends:
Use compute-optimized instances as workers. For example, AWS c3.4xlarge instances.
Set the number of shuffle partitions to 1-2 times number of cores in the cluster.
Set the
spark.sql.streaming.noDataMicroBatches.enabled
configuration tofalse
in the SparkSession. This prevents the streaming micro-batch engine from processing micro-batches that do not contain data. Note also that setting this configuration tofalse
could result in stateful operations that leverage watermarks or processing time timeouts to not get data output until new data arrives instead of immediately.
Regarding performance benefits, RocksDB-based state management can maintain 100 times more state keys than the default one. For example, in a Spark cluster with AWS c3.4xlarge instances as workers, the default state management can maintain up to 1-2 million state keys per executor after which the JVM GC starts affecting performance significantly. In contrast, the RocksDB-based state management can easily maintain 100 million state keys per executor without any GC issues.
Note
The state management scheme cannot be changed between query restarts. That is, if a query has been started with the default management, then it cannot changed without starting the query from scratch with a new checkpoint location.