Asynchronous state checkpointing for Structured Streaming

Note

Available in Databricks Runtime 10.3 and above.

For stateful streaming queries bottlenecked on state updates, enabling asynchronous state checkpointing can reduce end-to-end latencies without sacrificing any fault-tolerance guarantees, but with a minor cost of higher restart delays.

Structured Streaming uses synchronous checkpointing by default. Every micro-batch ensures that all the state updates in that batch are backed up in cloud storage (called “checkpoint location”) before starting the next batch. If a stateful streaming query fails, all micro-batches except the last micro-batch are checkpointed. On restart, only the last batch needs to be re-run. Fast recovery with synchronous checkpointing comes at the cost of higher latency for each micro-batch.

Streaming State Checkpointing Modes

Asynchronous state checkpointing attempts to perform the checkpointing asynchronously so that the micro-batch execution doesn’t have to wait for the checkpoint to complete. In other words, the next micro-batch can start as soon as the computation of the previous micro-batch has been completed. Internally, however, the offset metadata (also saved in the checkpoint location) tracks whether the state checkpointing has been completed for a micro-batch. On query restart, more than one micro-batch may need to be re-executed - the last micro-batch whose computation was incomplete, as well as the one micro-batch before it whose state checkpointing was incomplete. And you get the same fault-tolerance guarantees (that is, exactly-once guarantees with an idempotent sink) as that of synchronous checkpointing.

Identifying Structured Streaming workloads that benefit from asynchronous checkpointing

The following are streaming job characteristics that may benefit from asynchronous state checkpointing.

  • Job has one or more stateful operations (e.g., aggregation, flatMapGroupsWithState, mapGroupsWithState, stream-stream joins)

  • State checkpoint latency is one of the major contributors to overall batch execution latency. This information can be found in the StreamingQueryProgress events. These events are found in log4j logs on Spark driver as well. Here is an example of streaming query progress and how to find the state checkpoint impact on the overall batch execution latency.

    •  {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
       }
      
    • State checkpoint latency analysis of above query progress event

      • Batch duration (durationMs.triggerDuration) is around 547 secs.

      • State store commit latency (stateOperations[0].commitTimeMs) is around 3,186 secs. Commit latency is aggregated across tasks containing a state store. In this case there are 64 such tasks (stateOperators[0].numShufflePartitions).

      • Each task containing state operator took an average of 50 sec (3,186/64) for checkpoint. This is an extra latency that is contributed to the batch duration. Assuming all 64 tasks are running concurrently, checkpoint step contributed around 9% (50 secs / 547 secs) of the batch duration. The percentage gets even higher when the max concurrent tasks is less than 64.

Enabling asynchronous state checkpointing

Set following configuration in streaming job. Async checkpointing needs a state store implementation that supports async commits. Currently only the RocksDB based state store implementation supports it.

spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Limitations and requirements for asynchronous checkpointing

Note

Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Delta Live Tables with Enhanced Autoscaling for streaming workloads. See What is Enhanced Autoscaling?.

  • Any failure in an asynchronous checkpoint at any one or more stores fails the query. In synchronous checkpointing mode, the checkpoint is executed as part of the task and Spark retries the task multiple times before failing the query. This mechanism is not present with asynchronous state checkpointing. However, using the Databricks job retries, such failures can be automatically retried.

  • Asynchronous checkpointing works best when the state store locations are not changed between micro-batch executions. Cluster resizing, in combination with asynchronous state checkpointing, may not work well because the state stores instance might get re-distributed as nodes are added or deleted as part of the cluster resizing event.

  • Asynchronous state checkpointing is supported only in the RocksDB state store provider implementation. The default in-memory state store implementation does not support it.