モニタリング 構造化ストリーミング クエリ on Databricks
は、 ストリーミング タブの下のDatabricks を通じて、構造化ストリーミング アプリケーションの組み込み モニタリングを提供します。Spark UI
Spark UI での構造化ストリーミング クエリの区別
writeStream
コードに .queryName(<query-name>)
を追加してストリームに一意のクエリ名を付けると、 Spark UI内のどのメトリクスがどのストリームに属しているかを簡単に区別できます。
Push 構造化ストリーミング メトリクスを外部サービス
ストリーミングメトリクスは、Apache Spark のストリーミングクエリリスナーインターフェースを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、 StreamingQueryListener
を Python と Scala で使用できます。
Unity Catalog 対応のコンピュート アクセス モードを使用するワークロードには、次の制限が適用されます。
StreamingQueryListener
Databricks Runtime 15.1 以降で、認証情報を使用したり、コンピュートの Unity Catalog によって管理されるオブジェクトと対話したりするには、専用アクセスモードが必要です。StreamingQueryListener
標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードには、Databricks Runtime 16.1 以降が必要です。
リスナーによる処理の遅延は、クエリの処理速度に大きな影響を与える可能性があります。 これらのリスナーの処理ロジックを制限し、効率のために Kafka などの高速応答システムに書き込むことを選択することをお勧めします。
次のコードは、リスナーを実装するための構文の基本的な例を示しています。
- Scala
- Python
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
構造化ストリーミングでの観測可能なメトリクスの定義
観測可能なメトリクスは、クエリ (データフレーム) で定義できる任意の集計関数と呼ばれます。 データフレームの実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが発行されます。
これらのメトリクスは、 Spark セッションにリスナーをアタッチすることで観察できます。 リスナーは実行モードによって異なります。
-
バッチモード :
QueryExecutionListener
を使用します。QueryExecutionListener
は、クエリの完了時に呼び出されます。QueryExecution.observedMetrics
マップを使用してメトリクスにアクセスします。 -
ストリーミングまたはマイクロバッチ :
StreamingQueryListener
を使用します。StreamingQueryListener
は、ストリーミング クエリがエポックを完了したときに呼び出されます。StreamingQueryProgress.observedMetrics
マップを使用してメトリクスにアクセスします。Databricks は、ストリーミングのcontinuous
トリガー モードをサポートしていません。
例えば:
- Scala
- Python
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Map Unity Catalog、 Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子
構造化ストリーミング メトリクスは、ストリーミング クエリのソース として使用される Delta テーブルの一意の ID に、いくつかの場所で reservoirId
フィールドを使用します。
reservoirId
フィールドは、Delta テーブルによって Delta トランザクション・ログに格納されたユニーク ID をマップします。この ID は、 Unity Catalog によって割り当てられ、Catalog Explorer に表示される tableId
値にはマップされません。
次の構文を使用して、Delta テーブルのテーブル識別子を確認します。 これは、 Unity Catalog マネージドテーブル、 Unity Catalog 外部テーブル、およびすべての Hive metastore Delta テーブルで機能します。
DESCRIBE DETAIL <table-name>
結果に表示される id
フィールドは、ストリーミング メトリクスの reservoirId
にマッピングされる識別子です。
StreamingQueryListener オブジェクトメトリクス
メトリクス | 説明 |
---|---|
| 再起動後も保持される一意のクエリ ID。 |
| 起動/再起動のたびに固有のクエリID。 StreamingQuery.runId() を参照してください。 |
| クエリのユーザー指定の名前。 名前が指定されていない場合、名前は null です。 |
| マイクロバッチの実行のタイムスタンプ。 |
| 処理中のデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行される可能性があります。 同様に、処理するデータがない場合、バッチ ID は増分されません。 |
| トリガーで処理されたレコードの合計 (すべてのソースにわたる) 数。 |
| 到着データの集計 (すべてのソースにわたる) 率。 |
| Spark がデータを処理している集計 (すべてのソースにわたる) レート。 |
durationMs オブジェクト
マイクロバッチ実行プロセスのさまざまなステージを完了するのにかかる時間に関する情報。
メトリクス | 説明 |
---|---|
| マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画にかかる時間は含まれません。 |
| ソースからオフセットに関するメタデータを取得するのにかかる時間。 |
| マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を参照します。 |
| 実行プランの生成にかかった時間。 |
| マイクロバッチの計画と実行にかかる時間。 |
| 新しく使用可能なオフセットをコミットするのにかかった時間。 |
eventTime オブジェクト
マイクロバッチで処理されているデータ内で見られるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフルな集計を処理するための状態をトリミングする方法を理解するために、ウォーターマークによって使用されます。
メトリクス | 説明 |
---|---|
| そのトリガーで確認された平均イベント時間。 |
| そのトリガーで表示される最大イベント時間。 |
| そのトリガーで表示される最小イベント時間。 |
| そのトリガーで使用されるウォーターマークの値。 |
stateOperators オブジェクト
構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報。
メトリクス | 説明 |
---|---|
| メトリクスが関連するステートフル演算子の名前 ( |
| ステートフルな演算子または集計の結果としての状態の行の合計数。 |
| ステートフルな演算子または集計の結果として state で更新された行の合計数。 |
| このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。 |
| ステートフルな演算子または集計の結果として state から削除された行の合計数。 |
| このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。 |
| すべての更新 (put と remove) をコミットし、新しいバージョンを返すのにかかる時間。 |
| 状態ストアが使用するメモリです。 |
| ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ : 集計後に削除された行の数 (未加工の入力行ではない)。 この数値は正確ではありませんが、遅延データがドロップされていることを示しています。 |
| このステートフル オペレーターのシャッフル パーティションの数。 |
| オペレーターが初期化して保守した実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じです。 ただし、ストリーム-ストリーム結合は、パーティションごとに 4 つの状態ストア インスタンスを初期化します。 |
stateOperators.customMetrics オブジェクト
RocksDB Capturing メトリクスから収集された、構造化ストリーミング ジョブに対して保持するステートフルな値に関するパフォーマンスと操作に関する情報。詳細については、「RocksDBでの 状態ストアの構成Databricks 」を参照してください。
メトリクス | 説明 |
---|---|
| RocksDB ファイル マネージャーによって追跡されたコピーされたバイト数。 |
| ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。 |
| チェックポイントのコミット中の圧縮時間 (ミリ秒単位) (オプション)。 |
| ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。 |
| RocksDB インメモリをフラッシュする時間 (ミリ秒単位) がローカル ディスクに変わります。 |
| チェックポイント・コミットの一部としてバックグラウンド・ワーカー・スレッドを停止している時間 (ミリ秒単位) (コンパクションなど)。 |
| ステージングされた書き込みをインメモリ構造 ( |
| RocksDB ファイル マネージャーによって追跡されたコピーされたファイルの数。 |
| RocksDB ファイル マネージャーによって追跡された再利用されたファイルの数。 |
| DB への |
| 基になるネイティブ |
| RocksDB のブロックキャッシュからのキャッシュヒット数で、ローカルディスクの読み取りを回避するのに役立ちます。 |
| RocksDBのブロックキャッシュの数は、ローカルディスクの読み取りを回避するのに役立ちません。 |
| すべての静的ソートテーブル(SST)ファイルのサイズ - RocksDBがデータを格納するために使用する表形式構造。 |
|
|
| コンパクション・プロセスがディスクから読み取るバイト数。 |
| イテレータを使用して読み取られた非圧縮データの合計バイト数。 一部のステートフル操作 ( |
|
|
| 圧縮プロセスがディスクに書き込む合計バイト数。 |
| RocksDB コンパクションの時間 (ミリ秒単位) (バックグラウンドコンパクションとコミット中に開始されたオプションのコンパクションを含む)。 |
| バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、 |
| ファイルマネージャーによって報告される非圧縮zipファイルのサイズ(バイト単位)。 ファイルマネージャーは、SSTファイルの物理ディスク容量の使用と削除を管理します。 |
| チェックポイントの場所に保存されている最新バージョンの RocksDB スナップショット。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリクスは特定のパーティション ID と状態ストア名に適用されます。 |
ソース オブジェクト (Kafka)
メトリクス | 説明 |
---|---|
| Kafka ソースの詳細な説明で、読み取られる正確な Kafka トピックを指定します。 たとえば、 |
| ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。 |
| マイクロバッチによって処理された最後のオフセット。 これは、進行中のマイクロバッチ実行の |
| マイクロバッチによって計算された最新のオフセット。 マイクロバッチ処理では、調整がある場合にすべてのオフセットが処理されない可能性があり、その結果、 |
| このソースから処理された入力ローの数。 |
| このソースから処理のためにデータが到着する速度。 |
| Spark がこのソースからのデータを処理している速度。 |
ソース.メトリクス オブジェクト (Kafka)
メトリクス | 説明 |
---|---|
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で最新の使用可能なオフセットの背後にあるオフセットの平均数。 |
| クエリ プロセスがサブスクライブされたトピックから消費しなかった推定バイト数。 |
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。 |
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。 |
sink オブジェクト (Kafka)
メトリクス | 説明 |
---|---|
| ストリーミング クエリが書き込んでいる Kafka シンクの説明 (使用されている特定の Kafka シンクの実装について詳しく説明します)。 たとえば、 |
| マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値は "-1" になることがあり、通常は "不明" と解釈できます。 |
ソース オブジェクト (Delta Lake)
メトリクス | 説明 |
---|---|
| ストリーミング クエリの読み取り元となるソースの説明。 たとえば、 |
| このオフセットのエンコードに使用するシリアル化のバージョン。 |
| 読み取られるテーブルの ID。 これは、クエリの再開時の設定ミスを検出するために使用されます。 Map Unity Catalog、Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子を参照してください。 |
| 現在処理中のテーブルのバージョン。 |
| このバージョンの |
| 現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを識別します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが最初に処理され、次に新しいデータが到着します。 |
| マイクロバッチ クエリによって処理された最新のオフセット。 |
| このソースから処理された入力ローの数。 |
| このソースから処理のためにデータが到着する速度。 |
| Spark がこのソースからのデータを処理している速度。 |
| 未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。 |
| 処理すべき未処理ファイルの数。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。 |
シンク オブジェクト (Delta Lake)
メトリクス | 説明 |
---|---|
| Deltaシンクの説明。使用されている特定の Delta シンクの実装について詳しく説明します。たとえば、 |
| Spark は DSv1 シンク (Delta Lake シンクの分類) の出力行を推測できないため、行数は常に "-1" です。 |
ソース オブジェクト (Kinesis)
メトリクス | 説明 |
---|---|
| Kinesis ソースの説明で、ストリーミングクエリが読み取っている正確な Kinesis ストリームを指定します。 たとえば、 |
ソース メトリクス object (Kinesis)
メトリクス | 説明 |
---|---|
| コンシューマがストリームの先頭から遅れた平均ミリ秒数。 |
| コンシューマがストリームの先頭から遅れた最大ミリ秒数。 |
| コンシューマがストリームの先頭から遅れた最小ミリ秒数。 |
| 処理に残っているバイト数。 これは、ソースとしての Kinesis のバックログメトリクスです。 |
詳細については、「Kinesis で報告されるメトリクス」を参照してください。
例
例 Kafka-to-Kafka StreamingQueryListener イベント
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Delta Lake から Delta Lake への StreamingQueryListener イベントの例
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
キネシスからデルタレイクへのストリーミングクエリリスナーイベントの例
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Kafka+Delta Lake 間の StreamingQueryListener イベントの例
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener イベントへのレート ソースの例
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}