Skip to main content

Monitoring Structured Streaming queries on Databricks

Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query-name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark's Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, StreamingQueryListener is available in Python and Scala.

important

The following limitations apply for workloads using Unity Catalog-enabled compute access modes:

  • StreamingQueryListener requires Databricks Runtime 15.1 or above to use credentials or interact with objects managed by Unity Catalog on compute with dedicated access mode.
  • StreamingQueryListener requires Databricks Runtime 16.1 or above for Scala workloads configured with standard access mode (formerly shared access mode).
note

Processing latency with listeners can significantly affect query processing speeds. It's advised to limit processing logic in these listeners and opt for writing to fast-response systems like Kafka for efficiency.

The following code provides basic examples of the syntax for implementing a listener:

Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}

/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or microbatch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Databricks does not support the continuous trigger mode for streaming.

For example:

Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})

Map Unity Catalog, Delta Lake, and Structured Streaming metrics table identifiers

Structured Streaming metrics use the reservoirId field in several places for the unique identity of a Delta table used as a source for a streaming query.

The reservoirId field maps the unique identifier stored by the Delta table in the Delta transaction log. This ID does not map to the tableId value assigned by Unity Catalog and displayed in Catalog Explorer.

Use the following syntax to review the table identifier for a Delta table. This works for Unity Catalog managed tables, Unity Catalog external tables, and all Hive metastore Delta tables:

SQL
DESCRIBE DETAIL <table-name>

The id field displayed in the results is the identifier that maps to the reservoirId in the streaming metrics.

StreamingQueryListener object metrics

Fields

Description

id

A unique query ID that persists across restarts.

runId

A query ID that is unique for every start/restart. See StreamingQuery.runId().

name

The user-specified name of the query. Name is null if no name is specified.

timestamp

The timestamp for the execution of the microbatch.

batchId

A unique ID for the current batch of data being processed. In the case of retries after a failure, a given batch ID may be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented.

batchDuration

The processing duration of a batch operation, in milliseconds.

numInputRows

The aggregate (across all sources) number of records processed in a trigger.

inputRowsPerSecond

The aggregate (across all sources) rate of arriving data.

processedRowsPerSecond

The aggregate (across all sources) rate at which Spark is processing data.

StreamingQueryListener also defines the following fields which contain objects you can examine for customer metrics and source progress details:

Fields

Description

durationMs

Type: ju.Map[String, JLong]. See durationMs object.

eventTime

Type: ju.Map[String, String]. See eventTime object.

stateOperators

Type: Array[StateOperatorProgress]. See stateOperators object.

sources

Type: Array[SourceProgress]. See sources object.

sink

Type: SinkProgress. See sink object.

observedMetrics

Type: ju.Map[String, Row]. Named arbitrary aggregate functions that can be defined on a DataFrame/query (such as df.observe).

durationMs object

Object type: ju.Map[String, JLong]

Information about the time it takes to complete various stages of the microbatch execution process.

Fields

Description

durationMs.addBatch

The time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch.

durationMs.getBatch

The time it takes to retrieve the metadata about the offsets from the source.

durationMs.latestOffset

The latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources.

durationMs.queryPlanning

The time taken to generate the execution plan.

durationMs.triggerExecution

The time it takes to plan and execute the microbatch.

durationMs.walCommit

The time taken to commit the new available offsets.

durationMs.commitBatch

The time taken to commit the data written to the sink during addBatch. Only present for sinks that support commit.

durationMs.commitOffsets

The time taken to commit the batch to the commit log.

eventTime object

Object type: ju.Map[String, String]

Information about the event time value seen within the data being processed in the microbatch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.

Fields

Description

eventTime.avg

The average event time seen in that trigger.

eventTime.max

The maximum event time seen in that trigger.

eventTime.min

The minimum event time seen in that trigger.

eventTime.watermark

The value of the watermark used in that trigger.

stateOperators object

Object type: Array[StateOperatorProgress] The stateOperators object contains information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.

For more details on stream state operators, see What is stateful streaming?.

Fields

Description

stateOperators.operatorName

The name of the stateful operator to which the metrics relate, such as symmetricHashJoin, dedupe, or stateStoreSave.

stateOperators.numRowsTotal

The total number of rows in state as a result of a stateful operator or aggregation.

stateOperators.numRowsUpdated

The total number of rows updated in state as a result of a stateful operator or aggregation.

stateOperators.allUpdatesTimeMs

This metric is currently not measurable by Spark and is planned to be removed in future updates.

stateOperators.numRowsRemoved

The total number of rows removed from state as a result of a stateful operator or aggregation.

stateOperators.allRemovalsTimeMs

This metric is currently not measurable by Spark and is planned to be removed in future updates.

stateOperators.commitTimeMs

The time taken to commit all updates (puts and removes) and return a new version.

stateOperators.memoryUsedBytes

Memory used by the state store.

stateOperators.numRowsDroppedByWatermark

The number of rows that are considered too late to be included in a stateful aggregation. Streaming aggregations only: The number of rows dropped post-aggregation (not raw input rows). This number is not precise, but provides an indication that there is late data being dropped.

stateOperators.numShufflePartitions

The number of shuffle partitions for this stateful operator.

stateOperators.numStateStoreInstances

The actual state store instance that the operator has initialized and maintained. For many stateful operators, this is the same as the number of partitions. However, stream-stream joins initialize four state store instances per partition.

stateOperators.customMetrics

See stateOperators.customMetrics in this topic for more details.

StateOperatorProgress.customMetrics object

Object type: ju.Map[String, JLong]

StateOperatorProgress has a field, customMetrics, which contains the metrics specific to feature you are using when gathering those metrics.

Feature

Description

RocksDB state store

Metrics for RocksDB state store.

HDFS state store

Metrics for HDFS state store.

Stream deduplication

Metrics for row deduplication.

Stream aggregation

Metrics for row aggregation.

Stream join operator

Metrics for stream join operator.

transformWithState

Metrics for transformWithState operator.

RocksDB state store custom metrics

Information collected from RocksDB capturing metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Databricks.

Fields

Description

customMetrics.rocksdbBytesCopied

The number of bytes copied as tracked by the RocksDB File Manager.

customMetrics.rocksdbCommitCheckpointLatency

The time in milliseconds taking a snapshot of native RocksDB and write it to a local directory.

customMetrics.rocksdbCompactLatency

The time in milliseconds compacting (optional) during the checkpoint commit.

customMetrics.rocksdbCommitCompactLatency

The compaction time during commit, in milliseconds.

customMetrics.rocksdbCommitFileSyncLatencyMs

The time in milliseconds syncing the native RocksDB snapshot to external storage (the checkpoint location).

customMetrics.rocksdbCommitFlushLatency

The time in milliseconds flushing the RocksDB in-memory changes to the local disk.

customMetrics.rocksdbCommitPauseLatency

The time in milliseconds stopping the background worker threads as part of the checkpoint commit, such as for compaction.

customMetrics.rocksdbCommitWriteBatchLatency

The time in milliseconds applying the staged writes in in-memory structure (WriteBatch) to native RocksDB.

customMetrics.rocksdbFilesCopied

The number of files copied as tracked by the RocksDB File Manager.

customMetrics.rocksdbFilesReused

The number of files reused as tracked by the RocksDB File Manager.

customMetrics.rocksdbGetCount

The number of get calls (does not include gets from WriteBatch - in-memory batch used for staging writes).

customMetrics.rocksdbGetLatency

The average time in nanoseconds for the underlying native RocksDB::Get call.

customMetrics.rocksdbReadBlockCacheHitCount

The count of cache hits from the block cache in RocksDB.

customMetrics.rocksdbReadBlockCacheMissCount

The count of the block cache misses in RocksDB.

customMetrics.rocksdbSstFileSize

The size of all Static Sorted Table (SST) files in the RocksDB instance.

customMetrics.rocksdbTotalBytesRead

The number of uncompressed bytes read by get operations.

customMetrics.rocksdbTotalBytesWritten

The total number of uncompressed bytes written by put operations.

customMetrics.rocksdbTotalBytesReadThroughIterator

The total number of bytes of uncompressed data read using an iterator. Some stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data into Databricks through an iterator.

customMetrics.rocksdbTotalBytesReadByCompaction

The number of bytes that the compaction process reads from the disk.

customMetrics.rocksdbTotalBytesWrittenByCompaction

The total number of bytes the compaction process writes to the disk.

customMetrics.rocksdbTotalCompactionLatencyMs

The time in milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit.

customMetrics.rocksdbTotalFlushLatencyMs

The total flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it's full. MemTables are the first level where data is stored in RocksDB.

customMetrics.rocksdbZipFileBytesUncompressed

The size in bytes of the uncompressed zip files as reported by the File Manager. The File Manager manages the physical SST file disk space utilization and deletion.

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

The most recent version of the RocksDB snapshot saved to the checkpoint location. A value of "-1" indicates that no snapshot has ever been saved. Since snapshots are specific to each state store instance, this metric applies to a particular partition ID and state store name.

customMetrics.rocksdbPutLatency

The total put call latency.

customMetrics.rocksdbPutCount

The number of put calls.

customMetrics.rocksdbWriterStallLatencyMs

The writer wait time for compaction or flush to finish.

customMetrics.rocksdbTotalBytesWrittenByFlush

The total bytes written by flush

customMetrics.rocksdbPinnedBlocksMemoryUsage

The memory usage for pinned blocks

customMetrics.rocksdbNumInternalColFamiliesKeys

The number of internal keys for internal column families

customMetrics.rocksdbNumExternalColumnFamilies

The number of external column families

customMetrics.rocksdbNumInternalColumnFamilies

The number of internal column families

HDFS state store custom metrics

Information collected about HDFS state store provider behaviors and operations.

Fields

Description

customMetrics.stateOnCurrentVersionSizeBytes

The estimated size of state only on current version.

customMetrics.loadedMapCacheHitCount

The count of cache hit on states cached in provider.

customMetrics.loadedMapCacheMissCount

The count of cache miss on states cached in provider.

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

The last uploaded version of the snapshot for a specific state store instance.

Deduplication custom metrics

Information collected about deduplication behaviors and operations.

Fields

Description

customMetrics.numDroppedDuplicateRows

The number of duplicate rows dropped.

customMetrics.numRowsReadDuringEviction

The number of state rows read during state eviction.

Aggregation custom metrics

Information collected about aggregation behaviors and operations.

Fields

Description

customMetrics.numRowsReadDuringEviction

The number of state rows read during state eviction.

Stream join custom metrics

Information collected about stream join behaviors and operations.

Fields

Description

customMetrics.skippedNullValueCount

The number of skipped null values, when spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled is set to true.

transformWithState custom metrics

Information collected about transformWithState (TWS) behaviors and operations. For more details on transformWithState, see Build a custom stateful application.

Fields

Description

customMetrics.initialStateProcessingTimeMs

Number of milliseconds taken to process all initial state .

customMetrics.numValueStateVars

Number of value state variables. Also present for transformWithStateInPandas.

customMetrics.numListStateVars

Number of list state variables. Also present for transformWithStateInPandas.

customMetrics.numMapStateVars

Number of map state variables. Also present for transformWithStateInPandas.

customMetrics.numDeletedStateVars

Number of deleted state variables. Also present for transformWithStateInPandas.

customMetrics.timerProcessingTimeMs

Number of milliseconds taken to process all timers

customMetrics.numRegisteredTimers

Number of registered timers. Also present for transformWithStateInPandas.

customMetrics.numDeletedTimers

Number of deleted timers. Also present for transformWithStateInPandas.

customMetrics.numExpiredTimers

Number of expired timers. Also present for transformWithStateInPandas.

customMetrics.numValueStateWithTTLVars

Number of value state variables with TTL. Also present for transformWithStateInPandas.

customMetrics.numListStateWithTTLVars

Number of list state variables with TTL. Also present for transformWithStateInPandas.

customMetrics.numMapStateWithTTLVars

Number of map state variables with TTL. Also present for transformWithStateInPandas.

customMetrics.numValuesRemovedDueToTTLExpiry

Number of values removed due to TTL expiry. Also present for transformWithStateInPandas.

customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry

Number of values incrementally removed due to TTL expiry.

sources object

Object type: Array[SourceProgress]

The sources object contains information and metrics for streaming data sources.

Fields

Description

description

A detailed description of the streaming data source table.

startOffset

The starting offset number within the data source table at which the streaming job started.

endOffset

The last offset processed by the microbatch.

latestOffset

The latest offset processed by the microbatch.

numInputRows

The number of input rows processed from this source.

inputRowsPerSecond

The rate, in seconds, at which data is arriving for processing from this source.

processedRowsPerSecond

The rate at which Spark is processing data from this source.

metrics

Type: ju.Map[String, String]. Contains custom metrics for a specific data source.

Databricks provides the following sources object implementation:

note

For fields defined in the form sources.<startOffset / endOffset / latestOffset>.* (or some variation), interpret it as one of the (up to these) 3 possible fields, all containing the indicated child field:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake sources object

Definitions for custom metrics used for Delta table streaming data sources.

Fields

Description

sources.description

The description of the source from which the streaming query is reading from. For example: “DeltaSource[table]”.

sources.<startOffset / endOffset>.sourceVersion

The version of serialization with which this offset is encoded.

sources.<startOffset / endOffset>.reservoirId

The ID of the table being read. This is used to detect misconfiguration when restarting a query. See Map Unity Catalog, Delta Lake, and Structured Streaming metrics table identifiers.

sources.<startOffset / endOffset>.reservoirVersion

The version of the table that is currently processing.

sources.<startOffset / endOffset>.index

The index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path.

sources.<startOffset / endOffset>.isStartingVersion

Identifies whether current offset marks the start of a new streaming query rather than the processing of changes that occurred after the initial data was processed. When starting a new query, all data present in the table at the start is processed first, and then any new data that arrives.

sources.<startOffset / endOffset / latestOffset>.eventTimeMillis

Event time recorded for event time ordering. The event time of initial snapshot data that's pending to be processed. Used when processing an initial snapshot with event time order.

sources.latestOffset

The latest offset processed by the microbatch query.

sources.numInputRows

The number of input rows processed from this source.

sources.inputRowsPerSecond

The rate at which data is arriving for processing from this source.

sources.processedRowsPerSecond

The rate at which Spark is processing data from this source.

sources.metrics.numBytesOutstanding

The combined size of the outstanding files (files tracked by RocksDB). This is the backlog metric for Delta and Auto Loader as the streaming source.

sources.metrics.numFilesOutstanding

The number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source.

Apache Kafka sources object

Definitions for custom metrics used for Apache Kafka streaming data sources.

Fields

Description

sources.description

A detailed description of the Kafka source, specifying the exact Kafka topic being read from. For example: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.

sources.startOffset

The starting offset number within the Kafka topic at which the streaming job started.

sources.endOffset

The last offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution.

sources.latestOffset

The latest offset figured by the microbatch. The microbatching process might not process all offsets when there is throttling, which results in endOffset and latestOffset differing.

sources.numInputRows

The number of input rows processed from this source.

sources.inputRowsPerSecond

The rate at which data is arriving for processing from this source.

sources.processedRowsPerSecond

The rate at which Spark is processing data from this source.

sources.metrics.avgOffsetsBehindLatest

The average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sources.metrics.estimatedTotalBytesBehindLatest

The estimated number of bytes that the query process has not consumed from the subscribed topics.

sources.metrics.maxOffsetsBehindLatest

The maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sources.metrics.minOffsetsBehindLatest

The minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

Auto Loader sources metrics

Definitions for custom metrics used for Auto Loader streaming data sources.

Fields

Description

sources.<startOffset / endOffset / latestOffset>.seqNum

The current position in the sequence of files being processed in the order the files were discovered.

sources.<startOffset / endOffset / latestOffset>.sourceVersion

The implementation version of the cloudFiles source.

sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs

The start time of the most recent backfill operation.

sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs

The end time of the most recent backfill operation.

sources.<startOffset / endOffset / latestOffset>.lastInputPath

The last user-provided input path of the stream before the stream was restarted.

sources.metrics.numFilesOutstanding

The number of files in the backlog

sources.metrics.numBytesOutstanding

The size (bytes) of the files in the backlog

sources.metrics.approximateQueueSize

The approximate size of the message queue. Only when cloudFiles.useNotifications option is enabled.

PubSub sources metrics

Definitions for custom metrics used for PubSub streaming data sources. For more details on monitoring PubSub streaming sources, see Monitoring streaming metrics.

Fields

Description

sources.<startOffset / endOffset / latestOffset>.sourceVersion

The implementation version that this offset is encoded with.

sources.<startOffset / endOffset / latestOffset>.seqNum

The persisted sequence number that is being processed.

sources.<startOffset / endOffset / latestOffset>.fetchEpoch

The largest fetch epoch being processed.

sources.metrics.numRecordsReadyToProcess

The number of records available for processing in the current backlog.

sources.metrics.sizeOfRecordsReadyToProcess

The total size in bytes, of unprocessed data in the current backlog.

sources.metrics.numDuplicatesSinceStreamStart

The total count of duplicate records processed by the stream since it started.

Pulsar sources metrics

Definitions for custom metrics used for Pulsar streaming data sources.

Fields

Description

sources.metrics.numInputRows

The number of rows processed in the current micro-batch.

sources.metrics.numInputBytes

The total number of bytes processed in the current micro-batch.

sink object

Object type: SinkProgress

Fields

Description

sink.description

The description of the sink, detailing the specific sink implementation being used.

sink.numOutputRows

The number of output rows. Different sink types may have different behaviors or restrictions for the values. See the specific supported types

sink.metrics

ju.Map[String, String] of sink metrics.

Currently, Databricks provides two specific sink object implementations:

Sink type

Details

Delta table

See Delta sink object.

Apache Kafka topic

See Kafka sink object.

The sink.metrics field behaves the same for both variants of the sink object.

Delta Lake sink object

Fields

Description

sink.description

The description of the Delta sink, detailing the specific Delta sink implementation being used. For example: “DeltaSink[table]”.

sink.numOutputRows

The number of rows is always -1 because Spark can't infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink.

Apache Kafka sink object

Fields

Description

sink.description

The description of the Kafka sink to which the streaming query is writing, detailing the specific Kafka sink implementation being used. For example: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.

sink.numOutputRows

The number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”.

Examples

Example Kafka-to-Kafka StreamingQueryListener event

Python
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}

Example Delta Lake-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}

Example Kinesis-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}

Example Kafka+Delta Lake-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}

Example rate source to Delta Lake StreamingQueryListener event

Python
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}