メインコンテンツまでスキップ

モニタリング 構造化ストリーミング クエリ on Databricks

は、 ストリーミング タブの下のDatabricks を通じて、構造化ストリーミング アプリケーションの組み込み モニタリングを提供します。Spark UI

Spark UI での構造化ストリーミング クエリの区別

writeStream コードに .queryName(<query-name>) を追加してストリームに一意のクエリ名を付けると、 Spark UI内のどのメトリクスがどのストリームに属しているかを簡単に区別できます。

Push 構造化ストリーミング メトリクスを外部サービス

ストリーミングメトリクスは、Apache Spark のストリーミングクエリリスナーインターフェースを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、 StreamingQueryListener を Python と Scala で使用できます。

important

Unity Catalog 対応のコンピュート アクセス モードを使用するワークロードには、次の制限が適用されます。

  • StreamingQueryListener Databricks Runtime 15.1 以降で、認証情報を使用したり、コンピュートの Unity Catalog によって管理されるオブジェクトと対話したりするには、専用アクセスモードが必要です。
  • StreamingQueryListener 標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードには、Databricks Runtime 16.1 以降が必要です。
注記

リスナーによる処理の遅延は、クエリの処理速度に大きな影響を与える可能性があります。 これらのリスナーの処理ロジックを制限し、効率のために Kafka などの高速応答システムに書き込むことを選択することをお勧めします。

次のコードは、リスナーを実装するための構文の基本的な例を示しています。

Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}

/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

構造化ストリーミングでの観測可能なメトリクスの定義

観測可能なメトリクスは、クエリ (データフレーム) で定義できる任意の集計関数と呼ばれます。 データフレームの実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが発行されます。

これらのメトリクスは、 Spark セッションにリスナーをアタッチすることで観察できます。 リスナーは実行モードによって異なります。

  • バッチモード : QueryExecutionListenerを使用します。

    QueryExecutionListener は、クエリの完了時に呼び出されます。 QueryExecution.observedMetricsマップを使用してメトリクスにアクセスします。

  • ストリーミングまたはマイクロバッチ : StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetricsマップを使用してメトリクスにアクセスします。Databricks は、ストリーミングの continuous トリガー モードをサポートしていません。

例えば:

Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})

Map Unity Catalog、 Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子

構造化ストリーミング メトリクスは、ストリーミング クエリのソース として使用される Delta テーブルの一意の ID に、いくつかの場所で reservoirId フィールドを使用します。

reservoirId フィールドは、Delta テーブルによって Delta トランザクション・ログに格納されたユニーク ID をマップします。この ID は、 Unity Catalog によって割り当てられ、Catalog Explorer に表示される tableId 値にはマップされません。

次の構文を使用して、Delta テーブルのテーブル識別子を確認します。 これは、 Unity Catalog マネージドテーブル、 Unity Catalog 外部テーブル、およびすべての Hive metastore Delta テーブルで機能します。

SQL
DESCRIBE DETAIL <table-name>

結果に表示される id フィールドは、ストリーミング メトリクスの reservoirId にマッピングされる識別子です。

StreamingQueryListener オブジェクトメトリクス

メトリクス

説明

id

再起動後も保持される一意のクエリ ID。

runId

起動/再起動のたびに固有のクエリID。 StreamingQuery.runId() を参照してください。

name

クエリのユーザー指定の名前。 名前が指定されていない場合、名前は null です。

timestamp

マイクロバッチの実行のタイムスタンプ。

batchId

処理中のデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行される可能性があります。 同様に、処理するデータがない場合、バッチ ID は増分されません。

numInputRows

トリガーで処理されたレコードの合計 (すべてのソースにわたる) 数。

inputRowsPerSecond

到着データの集計 (すべてのソースにわたる) 率。

processedRowsPerSecond

Spark がデータを処理している集計 (すべてのソースにわたる) レート。

durationMs オブジェクト

マイクロバッチ実行プロセスのさまざまなステージを完了するのにかかる時間に関する情報。

メトリクス

説明

durationMs.addBatch

マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画にかかる時間は含まれません。

durationMs.getBatch

ソースからオフセットに関するメタデータを取得するのにかかる時間。

durationMs.latestOffset

マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を参照します。

durationMs.queryPlanning

実行プランの生成にかかった時間。

durationMs.triggerExecution

マイクロバッチの計画と実行にかかる時間。

durationMs.walCommit

新しく使用可能なオフセットをコミットするのにかかった時間。

eventTime オブジェクト

マイクロバッチで処理されているデータ内で見られるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフルな集計を処理するための状態をトリミングする方法を理解するために、ウォーターマークによって使用されます。

メトリクス

説明

eventTime.avg

そのトリガーで確認された平均イベント時間。

eventTime.max

そのトリガーで表示される最大イベント時間。

eventTime.min

そのトリガーで表示される最小イベント時間。

eventTime.watermark

そのトリガーで使用されるウォーターマークの値。

stateOperators オブジェクト

構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報。

メトリクス

説明

stateOperators.operatorName

メトリクスが関連するステートフル演算子の名前 ( symmetricHashJoindedupestateStoreSaveなど)

stateOperators.numRowsTotal

ステートフルな演算子または集計の結果としての状態の行の合計数。

stateOperators.numRowsUpdated

ステートフルな演算子または集計の結果として state で更新された行の合計数。

stateOperators.allUpdatesTimeMs

このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。

stateOperators.numRowsRemoved

ステートフルな演算子または集計の結果として state から削除された行の合計数。

stateOperators.allRemovalsTimeMs

このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。

stateOperators.commitTimeMs

すべての更新 (put と remove) をコミットし、新しいバージョンを返すのにかかる時間。

stateOperators.memoryUsedBytes

状態ストアが使用するメモリです。

stateOperators.numRowsDroppedByWatermark

ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ : 集計後に削除された行の数 (未加工の入力行ではない)。 この数値は正確ではありませんが、遅延データがドロップされていることを示しています。

stateOperators.numShufflePartitions

このステートフル オペレーターのシャッフル パーティションの数。

stateOperators.numStateStoreInstances

オペレーターが初期化して保守した実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じです。 ただし、ストリーム-ストリーム結合は、パーティションごとに 4 つの状態ストア インスタンスを初期化します。

stateOperators.customMetrics オブジェクト

RocksDB Capturing メトリクスから収集された、構造化ストリーミング ジョブに対して保持するステートフルな値に関するパフォーマンスと操作に関する情報。詳細については、「RocksDBでの 状態ストアの構成Databricks 」を参照してください。

メトリクス

説明

customMetrics.rocksdbBytesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたバイト数。

customMetrics.rocksdbCommitCheckpointLatency

ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。

customMetrics.rocksdbCompactLatency

チェックポイントのコミット中の圧縮時間 (ミリ秒単位) (オプション)。

customMetrics.rocksdbCommitFileSyncLatencyMs

ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。

customMetrics.rocksdbCommitFlushLatency

RocksDB インメモリをフラッシュする時間 (ミリ秒単位) がローカル ディスクに変わります。

customMetrics.rocksdbCommitPauseLatency

チェックポイント・コミットの一部としてバックグラウンド・ワーカー・スレッドを停止している時間 (ミリ秒単位) (コンパクションなど)。

customMetrics.rocksdbCommitWriteBatchLatency

ステージングされた書き込みをインメモリ構造 (WriteBatch) でネイティブ RocksDB に適用する時間 (ミリ秒単位)。

customMetrics.rocksdbFilesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたファイルの数。

customMetrics.rocksdbFilesReused

RocksDB ファイル マネージャーによって追跡された再利用されたファイルの数。

customMetrics.rocksdbGetCount

DB へのget呼び出しの数 (WriteBatch からの gets (ステージング書き込みに使用されるメモリ内バッチは含まれません)。

customMetrics.rocksdbGetLatency

基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。

customMetrics.rocksdbReadBlockCacheHitCount

RocksDB のブロックキャッシュからのキャッシュヒット数で、ローカルディスクの読み取りを回避するのに役立ちます。

customMetrics.rocksdbReadBlockCacheMissCount

RocksDBのブロックキャッシュの数は、ローカルディスクの読み取りを回避するのに役立ちません。

customMetrics.rocksdbSstFileSize

すべての静的ソートテーブル(SST)ファイルのサイズ - RocksDBがデータを格納するために使用する表形式構造。

customMetrics.rocksdbTotalBytesRead

get操作によって読み取られた非圧縮バイト数。

customMetrics.rocksdbTotalBytesReadByCompaction

コンパクション・プロセスがディスクから読み取るバイト数。

customMetrics.rocksdbTotalBytesReadThroughIterator

イテレータを使用して読み取られた非圧縮データの合計バイト数。 一部のステートフル操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマークなど) では、イテレータを介して DB のデータを読み取る必要があります。

customMetrics.rocksdbTotalBytesWritten

put操作によって書き込まれた非圧縮バイトの合計数。

customMetrics.rocksdbTotalBytesWrittenByCompaction

圧縮プロセスがディスクに書き込む合計バイト数。

customMetrics.rocksdbTotalCompactionLatencyMs

RocksDB コンパクションの時間 (ミリ秒単位) (バックグラウンドコンパクションとコミット中に開始されたオプションのコンパクションを含む)。

customMetrics.rocksdbTotalFlushLatencyMs

バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、 MemTable がいっぱいになったときにストレージにフラッシュされるプロセスです。 MemTables は、データがRocksDBに保存される最初のレベルです。

customMetrics.rocksdbZipFileBytesUncompressed

ファイルマネージャーによって報告される非圧縮zipファイルのサイズ(バイト単位)。 ファイルマネージャーは、SSTファイルの物理ディスク容量の使用と削除を管理します。

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

チェックポイントの場所に保存されている最新バージョンの RocksDB スナップショット。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリクスは特定のパーティション ID と状態ストア名に適用されます。

ソース オブジェクト (Kafka)

メトリクス

説明

sources.description

Kafka ソースの詳細な説明で、読み取られる正確な Kafka トピックを指定します。 たとえば、 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”のようになります。

sources.startOffset オブジェクト

ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。

sources.endOffset オブジェクト

マイクロバッチによって処理された最後のオフセット。 これは、進行中のマイクロバッチ実行の latestOffset と等しくなる可能性があります。

sources.latestOffset オブジェクト

マイクロバッチによって計算された最新のオフセット。 マイクロバッチ処理では、調整がある場合にすべてのオフセットが処理されない可能性があり、その結果、 endOffsetlatestOffset の差異が発生します。

sources.numInputRows

このソースから処理された入力ローの数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理している速度。

ソース.メトリクス オブジェクト (Kafka)

メトリクス

説明

sources.metrics.avgOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で最新の使用可能なオフセットの背後にあるオフセットの平均数。

sources.metrics.estimatedTotalBytesBehindLatest

クエリ プロセスがサブスクライブされたトピックから消費しなかった推定バイト数。

sources.metrics.maxOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。

sources.metrics.minOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。

sink オブジェクト (Kafka)

メトリクス

説明

sink.description

ストリーミング クエリが書き込んでいる Kafka シンクの説明 (使用されている特定の Kafka シンクの実装について詳しく説明します)。 たとえば、 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”のようになります。

sink.numOutputRows

マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値は "-1" になることがあり、通常は "不明" と解釈できます。

ソース オブジェクト (Delta Lake)

メトリクス

説明

sources.description

ストリーミング クエリの読み取り元となるソースの説明。 たとえば、 “DeltaSource[table]”のようになります。

sources.[startOffset/endOffset].sourceVersion

このオフセットのエンコードに使用するシリアル化のバージョン。

sources.[startOffset/endOffset].reservoirId

読み取られるテーブルの ID。 これは、クエリの再開時の設定ミスを検出するために使用されます。 Map Unity Catalog、Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子を参照してください。

sources.[startOffset/endOffset].reservoirVersion

現在処理中のテーブルのバージョン。

sources.[startOffset/endOffset].index

このバージョンの AddFiles のシーケンスのインデックス。 これは、大きなコミットを複数のバッチに分割するために使用されます。 このインデックスは、 modificationTimestamppathで並べ替えることによって作成されます。

sources.[startOffset/endOffset].isStartingVersion

現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを識別します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが最初に処理され、次に新しいデータが到着します。

sources.latestOffset

マイクロバッチ クエリによって処理された最新のオフセット。

sources.numInputRows

このソースから処理された入力ローの数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理している速度。

sources.metrics.numBytesOutstanding

未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。

sources.metrics.numFilesOutstanding

処理すべき未処理ファイルの数。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。

シンク オブジェクト (Delta Lake)

メトリクス

説明

sink.description

Deltaシンクの説明。使用されている特定の Delta シンクの実装について詳しく説明します。たとえば、 “DeltaSink[table]”のようになります。

sink.numOutputRows

Spark は DSv1 シンク (Delta Lake シンクの分類) の出力行を推測できないため、行数は常に "-1" です。

ソース オブジェクト (Kinesis)

メトリクス

説明

description

Kinesis ソースの説明で、ストリーミングクエリが読み取っている正確な Kinesis ストリームを指定します。 たとえば、 “KinesisV2[stream]”のようになります。

ソース メトリクス object (Kinesis)

メトリクス

説明

avgMsBehindLatest

コンシューマがストリームの先頭から遅れた平均ミリ秒数。

maxMsBehindLatest

コンシューマがストリームの先頭から遅れた最大ミリ秒数。

minMsBehindLatest

コンシューマがストリームの先頭から遅れた最小ミリ秒数。

totalPrefetchedBytes

処理に残っているバイト数。 これは、ソースとしての Kinesis のバックログメトリクスです。

詳細については、「Kinesis で報告されるメトリクス」を参照してください。

例 Kafka-to-Kafka StreamingQueryListener イベント

Python
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}

Delta Lake から Delta Lake への StreamingQueryListener イベントの例

Python
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}

キネシスからデルタレイクへのストリーミングクエリリスナーイベントの例

Python
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}

Kafka+Delta Lake 間の StreamingQueryListener イベントの例

Python
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}

Delta Lake StreamingQueryListener イベントへのレート ソースの例

Python
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}