Configure RocksDB state store on Databricks

You can enable RockDB-based state management by setting the following configuration in the SparkSession before starting the streaming query.

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")

RocksDB state store metrics

Each state operator collects metrics related to the state management operations performed on its RocksDB instance to observe the state store and potentially help in debugging job slowness. These metrics are aggregated (sum) per state operator in job across all tasks where the state operator is running. These metrics are part of the customMetrics map inside the stateOperators fields in StreamingQueryProgress. The following is an example of StreamingQueryProgress in JSON form (obtained using StreamingQueryProgress.json()).

{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Detailed descriptions of the metrics are as follows:

Metric name

Description

rocksdbCommitWriteBatchLatency

Time (in millis) took for applying the staged writes in in-memory structure (WriteBatch) to native RocksDB.

rocksdbCommitFlushLatency

Time (in millis) took for flushing the RocksDB in-memory changes to local disk.

rocksdbCommitCompactLatency

Time (in millis) took for compaction (optional) during the checkpoint commit.

rocksdbCommitPauseLatency

Time (in millis) took for stopping the background worker threads (for compaction etc.) as part of the checkpoint commit.

rocksdbCommitCheckpointLatency

Time (in millis) took for taking a snapshot of native RocksDB and write it to a local directory.

rocksdbCommitFileSyncLatencyMs

Time (in millis) took for syncing the native RocksDB snapshot related files to an external storage (checkpoint location).

rocksdbGetLatency

Average time (in nanos) took per the underlying native RocksDB::Get call.

rocksdbPutCount

Average time (in nanos) took per the underlying native RocksDB::Put call.

rocksdbGetCount

Number of native RocksDB::Get calls (doesn’t include Gets from WriteBatch - in memory batch used for staging writes).

rocksdbPutCount

Number of native RocksDB::Put calls (doesn’t include Puts to WriteBatch - in memory batch used for staging writes).

rocksdbTotalBytesReadByGet

Number of uncompressed bytes read through native RocksDB::Get calls.

rocksdbTotalBytesWrittenByPut

Number of uncompressed bytes written through native RocksDB::Put calls.

rocksdbReadBlockCacheHitCount

Number of times the native RocksDB block cache is used to avoid reading data from local disk.

rocksdbReadBlockCacheMissCount

Number of times the native RocksDB block cache missed and required reading data from local disk.

rocksdbTotalBytesReadByCompaction

Number of bytes read from the local disk by the native RocksDB compaction process.

rocksdbTotalBytesWrittenByCompaction

Number of bytes written to the local disk by the native RocksDB compaction process.

rocksdbTotalCompactionLatencyMs

Time (in millis) took for RocksDB compactions (both background and the optional compaction initiated during the commit).

rocksdbWriterStallLatencyMs

Time (in millis) the writer has stalled due to a background compaction or flushing of the memtables to disk.

rocksdbTotalBytesReadThroughIterator

Some of the stateful operations (such as timeout processing in flatMapGroupsWithState or watermarking in windowed aggregations) requires reading entire data in DB through iterator. The total size of uncompressed data read using the iterator.