Asynchronous state checkpointing for stateful queries

Note

Available in Databricks Runtime 10.3 and above.

Asynchronous state checkpointing maintains exactly-once guarantees for streaming queries but can reduce overall latency for some Structured Streaming stateful workloads bottlenecked on state updates. This is accomplished by beginning to process the next micro-batch as soon as the computation of the previous micro-batch has been completed without waiting for state checkpointing to complete. The following table compares the tradeoffs for synchronous and asynchronous checkpointing:

Characteristic

Synchronous checkpointing

Asynchronous checkpointing

Latency

Higher latency for each micro-batch.

Reduced latency as micro-batches can overlap.

Restart

Fast recovery as only last batch needs to be re-run.

Higher restart delay as more than on micro-batch might need to be re-run.

The following are streaming job characteristics that might benefit from asynchronous state checkpointing:

  • Job has one or more stateful operations (e.g., aggregation, flatMapGroupsWithState, mapGroupsWithState, stream-stream joins)

  • State checkpoint latency is one of the major contributors to overall batch execution latency. This information can be found in the StreamingQueryProgress events. These events are found in log4j logs on Spark driver as well. Here is an example of streaming query progress and how to find the state checkpoint impact on the overall batch execution latency.

    •  {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
       }
      
    • State checkpoint latency analysis of above query progress event

      • Batch duration (durationMs.triggerDuration) is around 547 secs.

      • State store commit latency (stateOperations[0].commitTimeMs) is around 3,186 secs. Commit latency is aggregated across tasks containing a state store. In this case there are 64 such tasks (stateOperators[0].numShufflePartitions).

      • Each task containing state operator took an average of 50 sec (3,186/64) for checkpoint. This is an extra latency that is contributed to the batch duration. Assuming all 64 tasks are running concurrently, checkpoint step contributed around 9% (50 secs / 547 secs) of the batch duration. The percentage gets even higher when the max concurrent tasks is less than 64.

Enabling asynchronous state checkpointing

You must use the RocksDB based state store for asyncronous state checkpointing. Set the following configurations:

spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Limitations and requirements for asynchronous checkpointing

Note

Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Delta Live Tables with Enhanced Autoscaling for streaming workloads. See Optimize the cluster utilization of Delta Live Tables pipelines with Enhanced Autoscaling.

  • Any failure in an asynchronous checkpoint at any one or more stores fails the query. In synchronous checkpointing mode, the checkpoint is executed as part of the task and Spark retries the task multiple times before failing the query. This mechanism is not present with asynchronous state checkpointing. However, using the Databricks job retries, such failures can be automatically retried.

  • Asynchronous checkpointing works best when the state store locations are not changed between micro-batch executions. Cluster resizing, in combination with asynchronous state checkpointing, might not work well because the state stores instance might get re-distributed as nodes are added or deleted as part of the cluster resizing event.

  • Asynchronous state checkpointing is supported only in the RocksDB state store provider implementation. The default in-memory state store implementation does not support it.