Databricksでの構造化ストリーミングクエリーの監視

Databricks には、 [ストリーミング] タブの Spark UI を使用して、構造化ストリーミング アプリケーション用の組み込みの監視が用意されています。

構造化ストリーミング クエリを Spark UIで区別する

writeStream コードに .queryName(<query-name>) を追加してストリームに一意のクエリー名を付け、Spark UI でどのメトリクスがどのストリームに属しているかを簡単に区別できるようにします。

構造化ストリーミングメトリクスを外部サービスにプッシュする

ストリーミング メトリックはApache Sparkのストリーミング Query Listener インターフェイスを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、ストリーミング クエリ リスナーは Python と Scala で利用できます。

重要

Unity Catalog によって管理される資格情報とオブジェクトは、 StreamingQueryListener ロジックでは使用できません。

リスナーでの処理レイテンシーは、クエリの処理速度に大きく影響する可能性があります。 効率性を高めるために、これらのリスナーでの処理ロジックを制限し、Kafka などの高速応答システムへの書き込みを選択することをお勧めします。

次のコードは、リスナーを実装するための構文の基本的な例を示しています。

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

構造化ストリーミングでの監視可能なメトリクスの定義

Observable メトリクスは、クエリー (DataFrame) で定義できる任意の集計関数という名前です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリーが終了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが生成されます。

これらのメトリクスは、リスナーを Spark セッションにアタッチすることで確認できます。 リスナーは実行モードによって異なります。

  • バッチ モード: QueryExecutionListenerを使用します。

    QueryExecutionListener は、クエリーの完了時に呼び出されます。 QueryExecution.observedMetrics マップを使用してメトリクスにアクセスします。

  • ストリーミング、またはマイクロバッチ: StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetrics マップを使用してメトリクスにアクセスします。Databricks は、連続実行ストリーミングをサポートしていません。

例:

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener object メトリクス

メトリクス

説明

id

再起動後も保持される一意のクエリ ID。

runId

起動/再起動ごとに一意のクエリ ID。 StreamingQuery.runId()を参照してください。

name

クエリのユーザー指定の名前。 名前が指定されていない場合、名前は null です。

timestamp

マイクロバッチの実行のタイムスタンプ。

batchId

現在処理中のデータ バッチの一意の ID。 失敗後の再試行の場合、指定されたバッチ ID が複数回実行されることがあります。 同様に、処理するデータがない場合、バッチ ID は増加しません。

numInputRows

トリガーで処理されたレコードの合計数(すべてのソースにわたって)。

inputRowsPerSecond

到着データの集計(すべてのソースにわたる)レート。

processedRowsPerSecond

Spark がデータを処理している合計 (すべてのソースにわたる) 速度。

期間Ms オブジェクト

マイクロバッチ実行プロセスのさまざまな段階を完了するのにかかる時間に関する情報。

メトリクス

説明

durationMs.addBatch

マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチを計画するのにかかる時間は含まれません。

durationMs.getBatch

ソースからオフセットに関するメタデータを取得するのにかかる時間。

durationMs.latestOffset

マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかる時間を示します。

durationMs.queryPlanning

実行プランの生成にかかった時間。

durationMs.triggerExecution

マイクロバッチの計画と実行にかかる時間。

durationMs.walCommit

新しく利用可能なオフセットをコミットするのにかかる時間。

イベント時間オブジェクト

マイクロバッチで処理されているデータ内に表示されるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフル集計を処理するために状態をトリミングする方法を判断するためにウォーターマークによって使用されます。

メトリクス

説明

eventTime.avg

そのトリガーで確認された平均イベント時間。

eventTime.max

そのトリガーで確認された最大イベント時間。

eventTime.min

そのトリガーで確認された最小イベント時間。

eventTime.watermark

そのトリガーで使用されるウォーターマークの値。

状態演算子オブジェクト

構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報。

メトリクス

説明

stateOperators.operatorName

メトリックが関連するステートフル オペレーターの名前 ( symmetricHashJoindedupestateStoreSaveなど)。

stateOperators.numRowsTotal

ステートフルな演算子または集計の結果としての状態にある行の合計数。

stateOperators.numRowsUpdated

ステートフルな演算子または集計の結果として state で更新された行の合計数。

stateOperators.allUpdatesTimeMs

このメトリックは現在Sparkでは測定できず、将来のアップデートで削除される予定です。

stateOperators.numRowsRemoved

ステートフルな演算子または集計の結果として状態から削除された行の合計数。

stateOperators.allRemovalsTimeMs

このメトリックは現在Sparkでは測定できず、将来のアップデートで削除される予定です。

stateOperators.commitTimeMs

すべての更新 (追加と削除) をコミットして新しいバージョンを返すのにかかる時間。

stateOperators.memoryUsedBytes

状態ストアによって使用されるメモリ。

stateOperators.numRowsDroppedByWatermark

ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ: 集計後に削除された行数 (生の入力行ではありません)。 この数値は正確ではありませんが、遅延データがドロップされていることを示します。

stateOperators.numShufflePartitions

このステートフル演算子のシャッフル パーティションの数。

stateOperators.numStateStoreInstances

オペレーターが初期化して維持している実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じです。 ただし、ストリーム ストリーム結合では、パーティションごとに 4 つの状態ストア インスタンスが初期化されます。

stateOperators.customMetrics オブジェクト

構造化ストリーミング ジョブ用に維持されているステートフル値に関するパフォーマンスと操作に関するメトリックをキャプチャするRocksDBから収集された情報。 詳細については、 「 RocksDBでの ステートの構成」をDatabricks 参照してください。

メトリクス

説明

customMetrics.rocksdbBytesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたバイト数。

customMetrics.rocksdbCommitCheckpointLatency

ネイティブ RocksDB のスナップショットを取得してローカル ディレクトリに書き込むのにかかる時間 (ミリ秒)。

customMetrics.rocksdbCompactLatency

チェックポイントコミット中の圧縮時間(ミリ秒単位)(オプション)。

customMetrics.rocksdbCommitFileSyncLatencyMs

ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期するのにかかる時間 (ミリ秒)。

customMetrics.rocksdbCommitFlushLatency

RocksDB のメモリ内の変更をローカル ディスクにフラッシュするのにかかる時間 (ミリ秒)。

customMetrics.rocksdbCommitPauseLatency

圧縮などのチェックポイントコミットの一部としてバックグラウンドワーカースレッドを停止する時間(ミリ秒)。

customMetrics.rocksdbCommitWriteBatchLatency

メモリ内構造 ( WriteBatch ) のステージングされた書き込みをネイティブ RocksDB に適用するのにかかる時間 (ミリ秒)。

customMetrics.rocksdbFilesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたファイルの数。

customMetrics.rocksdbFilesReused

RocksDB ファイル マネージャーによって追跡された再利用されたファイルの数。

customMetrics.rocksdbGetCount

DB へのget呼び出しの数 ( WriteBatchからのgetsは含まれません - ステージング書き込みに使用されるメモリ内バッチ)。

customMetrics.rocksdbGetLatency

基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。

customMetrics.rocksdbReadBlockCacheHitCount

ローカル ディスクの読み取りを回避するのに役立つ、RocksDB のブロック キャッシュからのキャッシュ ヒットの数。

customMetrics.rocksdbReadBlockCacheMissCount

RocksDB のブロック キャッシュの数は、ローカル ディスクの読み取りを回避するのに役立ちません。

customMetrics.rocksdbSstFileSize

すべての Static Sorted Table (SST) ファイルのサイズ。これは、RocksDB がデータを保存するために使用する表形式の構造です。

customMetrics.rocksdbTotalBytesRead

get操作によって読み取られた非圧縮バイト数。

customMetrics.rocksdbTotalBytesReadByCompaction

コンパクション・プロセスがディスクから読み取るバイト数。

customMetrics.rocksdbTotalBytesReadThroughIterator

イテレータを使用して読み取られた非圧縮データの合計バイト数。 一部のステートフルな操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマークなど) では、イテレータを使用して DB 内のデータを読み取る必要があります。

customMetrics.rocksdbTotalBytesWritten

put操作によって書き込まれた圧縮されていないバイトの合計数。

customMetrics.rocksdbTotalBytesWrittenByCompaction

コンパクション・プロセスがディスクに書き込む合計バイト数。

customMetrics.rocksdbTotalCompactionLatencyMs

バックグラウンド圧縮とコミット中に開始されるオプションの圧縮を含む、RocksDB 圧縮にかかる時間 (ミリ秒)。

customMetrics.rocksdbTotalFlushLatencyMs

バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、 MemTable がいっぱいになったときにストレージにフラッシュするプロセスです。 MemTablesは、RocksDB でデータが保存される最初のレベルです。

customMetrics.rocksdbZipFileBytesUncompressed

ファイルマネージャによって報告された非圧縮zipファイルのサイズ(バイト単位)。 ファイル・マネージャーは、物理 SST ファイルのディスク・スペースの使用率と削除を管理します。

ソースオブジェクト (Kafka)

メトリクス

説明

sources.description

読み取られる正確な Kafka トピックを指定する、Kafka ソースの詳細な説明。 たとえば、 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”です。

sources.startOffset オブジェクト

ストリーミング ジョブが開始された Kafka トピック内の開始オフセット番号。

sources.endOffset オブジェクト

マイクロバッチによって処理された最後のオフセット。 これは、進行中のマイクロバッチ実行の latestOffset と等しくなる可能性があります。

sources.latestOffset オブジェクト

マイクロバッチによって計算された最新のオフセット。 マイクロバッチ処理プロセスでは、調整がある場合にすべてのオフセットが処理されない可能性があり、その結果、 endOffsetlatestOffset が異なります。

sources.numInputRows

このソースから処理された入力行の数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理する速度。

ソース.メトリクス object (Kafka)

メトリクス

説明

sources.metrics.avgOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされているすべてのトピックの中で最新の利用可能なオフセットより遅れているオフセットの平均数。

sources.metrics.estimatedTotalBytesBehindLatest

クエリプロセスがサブスクライブされたトピックから消費しなかった推定バイト数。

sources.metrics.maxOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされているすべてのトピックの中で最新の利用可能なオフセットより遅れているオフセットの最大数。

sources.metrics.minOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされているすべてのトピックの中で最新の利用可能なオフセットより遅れているオフセットの最小数。

シンクオブジェクト (Kafka)

メトリクス

説明

sink.description

ストリーミング クエリが書き込む Kafka シンクの説明。使用されている特定の Kafka シンク実装の詳細を示します。 たとえば、 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”です。

sink.numOutputRows

マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値は "-1" になり、通常は "不明" と解釈されます。

ソースオブジェクト (Delta Lake)

メトリクス

説明

sources.description

ストリーミング クエリの読み取り元となるソースの説明。 たとえば、 “DeltaSource[table]”です。

sources.[startOffset/endOffset].sourceVersion

このオフセットがエンコードされるシリアル化のバージョン。

sources.[startOffset/endOffset].reservoirId

読み取られるテーブルの ID。 これは、クエリの再起動時に設定ミスを検出するために使用されます。

sources.[startOffset/endOffset].reservoirVersion

現在処理中のテーブルのバージョン。

sources.[startOffset/endOffset].index

このバージョンでの AddFiles シーケンスのインデックス。 これは、大きなコミットを複数のバッチに分割するために使用されます。 このインデックスは、 modificationTimestamppathで並べ替えることによって作成されます。

sources.[startOffset/endOffset].isStartingVersion

現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを識別します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが最初に処理され、次に到着する新しいデータが処理されます。

sources.latestOffset

マイクロバッチクエリによって処理された最新のオフセット。

sources.numInputRows

このソースから処理された入力行の数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理する速度。

sources.metrics.numBytesOutstanding

未処理ファイル (RocksDB によって追跡されるファイル) の合計サイズ。 これは、ストリーミング ソースとしてのDeltaおよびAuto Loaderのバックログ メトリックです。

sources.metrics.numFilesOutstanding

処理する未処理のファイルの数。 これは、ストリーミング ソースとしてのDeltaおよびAuto Loaderのバックログ メトリックです。

シンクオブジェクト (Delta Lake)

メトリクス

説明

sink.description

Delta シンクの説明。使用されている特定の Delta シンクの実装について詳しく説明します。 たとえば、 “DeltaSink[table]”です。

sink.numOutputRows

Spark は Delta Lake シンクの分類である DSv1 シンクの出力行を推測できないため、行数は常に「-1」になります。

ソースオブジェクト (Kinesis)

メトリクス

説明

description

ストリーミング クエリが読み取る Kinesis ストリームを正確に指定する Kinesis ソースの説明。 たとえば、 “KinesisV2[stream]”です。

ソースメトリクスオブジェクト (Kinesis)

メトリクス

説明

avgMsBehindLatest

ストリームの開始からコンシューマーが遅れている平均ミリ秒数。

maxMsBehindLatest

コンシューマーがストリームの開始から遅れた最大ミリ秒数。

minMsBehindLatest

ストリームの開始からコンシューマーが遅れている最小ミリ秒数。

totalPrefetchedBytes

処理に残っているバイト数。 これは、ソースとしてのKinesisのバックログ メトリックです。

詳細については、「 Kinesis がレポートするメトリクス」を参照してください。

例:

Kafka 間ストリーミングクエリリスナーイベントの例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

例 Delta レイクからDelta レイクへのストリーミングクエリリスナーイベント

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis から Delta Lake への StreamingQueryListener イベントの例

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake から Delta Lake への StreamingQueryListener イベントの例

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lakeストリーミングクエリリスナーイベント レートソースの例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}