メインコンテンツまでスキップ

モニタリング 構造化ストリーミング クエリ on Databricks

は、 ストリーミング タブの下のDatabricks を通じて、構造化ストリーミング アプリケーションの組み込み モニタリングを提供します。Spark UI

Spark UI での構造化ストリーミング クエリの区別

writeStream コードに .queryName(<query-name>) を追加してストリームに一意のクエリ名を付けると、 Spark UI内のどのメトリクスがどのストリームに属しているかを簡単に区別できます。

Push 構造化ストリーミング メトリクスを外部サービス

ストリーミングメトリクスは、Apache Spark のストリーミングクエリリスナーインターフェースを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。Databricks Runtime 11.3 LTS 以降では、 StreamingQueryListener を Python と Scala で使用できます。

important

Unity Catalog 対応のコンピュート アクセス モードを使用するワークロードには、次の制限が適用されます。

  • StreamingQueryListener Databricks Runtime 15.1 以降で、認証情報を使用したり、コンピュートの Unity Catalog によって管理されるオブジェクトと対話したりするには、専用アクセスモードが必要です。
  • StreamingQueryListener 標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードには、Databricks Runtime 16.1 以降が必要です。
注記

リスナーによる処理の遅延は、クエリの処理速度に大きな影響を与える可能性があります。これらのリスナーの処理ロジックを制限し、効率のために Kafka などの高速応答システムに書き込むことを選択することをお勧めします。

次のコードは、リスナーを実装するための構文の基本的な例を示しています。

Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}

/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

構造化ストリーミングでの観測可能なメトリクスの定義

観測可能なメトリクスは、クエリ (データフレーム) で定義できる任意の集計関数と呼ばれます。 データフレームの実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが発行されます。

これらのメトリクスは、 Spark セッションにリスナーをアタッチすることで観察できます。 リスナーは実行モードによって異なります。

  • バッチモード : QueryExecutionListenerを使用します。

    QueryExecutionListener は、クエリの完了時に呼び出されます。 QueryExecution.observedMetricsマップを使用してメトリクスにアクセスします。

  • ストリーミングまたはマイクロバッチ : StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetricsマップを使用してメトリクスにアクセスします。Databricks は、ストリーミングの continuous トリガー モードをサポートしていません。

例えば:

Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})

Map Unity Catalog、 Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子

構造化ストリーミング メトリクスは、ストリーミング クエリのソース として使用される Delta テーブルの一意の ID に、いくつかの場所で reservoirId フィールドを使用します。

reservoirId フィールドは、Delta テーブルによって Delta トランザクション・ログに格納されたユニーク ID をマップします。この ID は、 Unity Catalog によって割り当てられ、Catalog Explorer に表示される tableId 値にはマップされません。

次の構文を使用して、Delta テーブルのテーブル識別子を確認します。 これは、 Unity Catalog マネージドテーブル、 Unity Catalog 外部テーブル、およびすべての Hive metastore Delta テーブルで機能します。

SQL
DESCRIBE DETAIL <table-name>

結果に表示される id フィールドは、ストリーミング メトリクスの reservoirId にマッピングされる識別子です。

StreamingQueryListener オブジェクトメトリクス

フィールド

説明

id

再起動後も保持される一意のクエリ ID。

runId

すべての開始/再起動で一意のクエリ ID。StreamingQuery.runId() を参照してください。

name

クエリのユーザー指定の名前。 名前が指定されていない場合、名前は null です。

timestamp

マイクロバッチの実行のタイムスタンプ。

batchId

処理中のデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行される可能性があります。 同様に、処理するデータがない場合、バッチ ID は増分されません。

batchDuration

バッチ操作の処理時間 (ミリ秒単位)。

numInputRows

トリガーで処理されたレコードの合計 (すべてのソースにわたる) 数。

inputRowsPerSecond

到着データの集計 (すべてのソースにわたる) 率。

processedRowsPerSecond

Spark がデータを処理している集計 (すべてのソースにわたる) レート。

StreamingQueryListener また、顧客メトリクスとソースの進行状況の詳細を調べることができるオブジェクトを含む次のフィールドも定義します。

フィールド

説明

durationMs

タイプ: ju.Map[String, JLong]durationMs オブジェクトを参照してください。

eventTime

タイプ: ju.Map[String, String]eventTime オブジェクトを参照してください。

stateOperators

タイプ: Array[StateOperatorProgress]stateOperators オブジェクトを参照してください。

sources

タイプ: Array[SourceProgress]「ソース object」を参照してください。

sink

タイプ: SinkProgressシンク オブジェクトを参照してください。

observedMetrics

タイプ: ju.Map[String, Row]。DataFrame/クエリで定義できる名前付きの任意の集計関数 ( df.observeなど)

durationMs オブジェクト

オブジェクトタイプ : ju.Map[String, JLong]

マイクロバッチ実行プロセスのさまざまなステージを完了するのにかかる時間に関する情報。

フィールド

説明

durationMs.addBatch

マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画にかかる時間は含まれません。

durationMs.getBatch

ソースからオフセットに関するメタデータを取得するのにかかる時間。

durationMs.latestOffset

マイクロバッチで消費された最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を参照します。

durationMs.queryPlanning

実行プランの生成にかかった時間。

durationMs.triggerExecution

マイクロバッチの計画と実行にかかる時間。

durationMs.walCommit

新しく使用可能なオフセットをコミットするのにかかった時間。

durationMs.commitBatch

addBatch中にシンクに書き込まれたデータをコミットするのにかかった時間。コミットをサポートするシンクにのみ存在します。

durationMs.commitOffsets

バッチをコミット ログにコミットするのにかかった時間。

eventTime オブジェクト

オブジェクトタイプ : ju.Map[String, String]

マイクロバッチで処理されているデータ内で見られるイベント時間値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されたステートフルな集計を処理するための状態をトリミングする方法を理解するために、ウォーターマークによって使用されます。

フィールド

説明

eventTime.avg

そのトリガーで確認された平均イベント時間。

eventTime.max

そのトリガーで表示される最大イベント時間。

eventTime.min

そのトリガーで表示される最小イベント時間。

eventTime.watermark

そのトリガーで使用されるウォーターマークの値。

stateOperators オブジェクト

オブジェクトタイプ : Array[StateOperatorProgress] stateOperators オブジェクトには、構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報が含まれています。

ストリーム状態演算子の詳細については、「 ステートフル ストリーミングとは」を参照してください。

フィールド

説明

stateOperators.operatorName

メトリクスが関連するステートフル演算子の名前 ( symmetricHashJoindedupestateStoreSaveなど)

stateOperators.numRowsTotal

ステートフルな演算子または集計の結果としての状態の行の合計数。

stateOperators.numRowsUpdated

ステートフルな演算子または集計の結果として state で更新された行の合計数。

stateOperators.allUpdatesTimeMs

このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。

stateOperators.numRowsRemoved

ステートフルな演算子または集計の結果として state から削除された行の合計数。

stateOperators.allRemovalsTimeMs

このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。

stateOperators.commitTimeMs

すべての更新 (put と remove) をコミットし、新しいバージョンを返すのにかかる時間。

stateOperators.memoryUsedBytes

状態ストアが使用するメモリです。

stateOperators.numRowsDroppedByWatermark

ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ : 集計後に削除された行の数 (未加工の入力行ではない)。 この数値は正確ではありませんが、遅延データがドロップされていることを示しています。

stateOperators.numShufflePartitions

このステートフル オペレーターのシャッフル パーティションの数。

stateOperators.numStateStoreInstances

オペレーターが初期化して保守した実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じです。 ただし、ストリーム-ストリーム結合は、パーティションごとに 4 つの状態ストア インスタンスを初期化します。

stateOperators.customMetrics

詳細については、このトピックの stateOperators.customMetrics を参照してください。

StateOperatorProgress.customMetrics オブジェクト

オブジェクトタイプ : ju.Map[String, JLong]

StateOperatorProgress には、 customMetricsというフィールドがあり、これには、メトリクスを収集するときに使用している機能に固有のメトリクスが含まれています。

機能

説明

RocksDB 状態ストア

メトリクス for RocksDB 状態ストア.

HDFS 状態ストア

メトリクス for HDFS 状態ストア.

ストリーム重複排除

行重複排除のメトリクス

ストリーム集約

メトリクス for row aggregation.

ストリーム結合演算子

メトリクス for ストリーム join operator.

transformWithState

メトリクス for transformWithState operator.

RocksDB 状態ストア custom メトリクス

RocksDB Capturing メトリクスから収集された、構造化ストリーミング ジョブに対して保持するステートフルな値に関するパフォーマンスと操作に関する情報。詳細については、「RocksDBでの 状態ストアの構成Databricks 」を参照してください。

フィールド

説明

customMetrics.rocksdbBytesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたバイト数。

customMetrics.rocksdbCommitCheckpointLatency

ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。

customMetrics.rocksdbCompactLatency

チェックポイントのコミット中の圧縮時間 (ミリ秒単位) (オプション)。

customMetrics.rocksdbCommitCompactLatency

コミット中の圧縮時間 (ミリ秒単位)。

customMetrics.rocksdbCommitFileSyncLatencyMs

ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。

customMetrics.rocksdbCommitFlushLatency

RocksDB インメモリをフラッシュする時間 (ミリ秒単位) がローカル ディスクに変わります。

customMetrics.rocksdbCommitPauseLatency

チェックポイント・コミットの一部としてバックグラウンド・ワーカー・スレッドを停止している時間 (ミリ秒単位) (コンパクションなど)。

customMetrics.rocksdbCommitWriteBatchLatency

ステージングされた書き込みをインメモリ構造 (WriteBatch) でネイティブ RocksDB に適用する時間 (ミリ秒単位)。

customMetrics.rocksdbFilesCopied

RocksDB ファイル マネージャーによって追跡されたコピーされたファイルの数。

customMetrics.rocksdbFilesReused

RocksDB ファイル マネージャーによって追跡された再利用されたファイルの数。

customMetrics.rocksdbGetCount

get呼び出しの数 (WriteBatch からの gets は含まれません - ステージング書き込みに使用されるメモリ内バッチ)。

customMetrics.rocksdbGetLatency

基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。

customMetrics.rocksdbReadBlockCacheHitCount

RocksDB のブロック キャッシュからのキャッシュ ヒット数。

customMetrics.rocksdbReadBlockCacheMissCount

RocksDBでのブロックキャッシュミスの数。

customMetrics.rocksdbSstFileSize

RocksDB インスタンス内のすべての静的ソートテーブル (SST) ファイルのサイズ。

customMetrics.rocksdbTotalBytesRead

get操作によって読み取られた非圧縮バイト数。

customMetrics.rocksdbTotalBytesWritten

put操作によって書き込まれた非圧縮バイトの合計数。

customMetrics.rocksdbTotalBytesReadThroughIterator

イテレータを使用して読み取られた非圧縮データの合計バイト数。一部のステートフル操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマークなど) では、イテレーターを介してデータを Databricks に読み取る必要があります。

customMetrics.rocksdbTotalBytesReadByCompaction

コンパクション・プロセスがディスクから読み取るバイト数。

customMetrics.rocksdbTotalBytesWrittenByCompaction

圧縮プロセスがディスクに書き込む合計バイト数。

customMetrics.rocksdbTotalCompactionLatencyMs

RocksDB コンパクションの時間 (ミリ秒単位) (バックグラウンドコンパクションとコミット中に開始されたオプションのコンパクションを含む)。

customMetrics.rocksdbTotalFlushLatencyMs

バックグラウンド フラッシュを含む合計フラッシュ時間。フラッシュ操作は、 MemTable がいっぱいになったときにストレージにフラッシュされるプロセスです。MemTables は、データがRocksDBに保存される最初のレベルです。

customMetrics.rocksdbZipFileBytesUncompressed

ファイルマネージャーによって報告される非圧縮zipファイルのサイズ(バイト単位)。 ファイルマネージャーは、SSTファイルの物理ディスク容量の使用と削除を管理します。

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

チェックポイントの場所に保存されている最新バージョンの RocksDB スナップショット。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリクスは特定のパーティション ID と状態ストア名に適用されます。

customMetrics.rocksdbPutLatency

put call の合計レイテンシ。

customMetrics.rocksdbPutCount

put 呼び出しの数。

customMetrics.rocksdbWriterStallLatencyMs

ライターは、コンパクションまたはフラッシュが完了するまでの待機時間です。

customMetrics.rocksdbTotalBytesWrittenByFlush

flush によって書き込まれた合計バイト数

customMetrics.rocksdbPinnedBlocksMemoryUsage

ピン留めブロックのメモリ使用量

customMetrics.rocksdbNumInternalColFamiliesKeys

内部カラム・ファミリの内部キーの数

customMetrics.rocksdbNumExternalColumnFamilies

外部列ファミリの数

customMetrics.rocksdbNumInternalColumnFamilies

内部列ファミリの数

HDFS 状態ストア custom メトリクス

HDFS状態ストア プロバイダーの動作と操作について収集された情報。

フィールド

説明

customMetrics.stateOnCurrentVersionSizeBytes

現在のバージョンのみの状態の推定サイズ。

customMetrics.loadedMapCacheHitCount

プロバイダーにキャッシュされた状態でヒットしたキャッシュの数。

customMetrics.loadedMapCacheMissCount

プロバイダーにキャッシュされた状態のキャッシュミスの数。

customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name>

特定の状態ストア インスタンスのスナップショットの最後にアップロードされたバージョン。

重複排除のカスタムメトリクス

重複排除の動作と操作に関して収集された情報。

フィールド

説明

customMetrics.numDroppedDuplicateRows

削除された重複行の数。

customMetrics.numRowsReadDuringEviction

ステートの削除中に読み取られたステートの行の数。

集計カスタムメトリクス

集計の動作と操作に関して収集された情報。

フィールド

説明

customMetrics.numRowsReadDuringEviction

ステートの削除中に読み取られたステートの行の数。

ストリーム結合カスタムメトリクス

ストリーム結合の動作と操作に関して収集された情報。

フィールド

説明

customMetrics.skippedNullValueCount

スキップされた null の値の数 ( spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabledtrueに設定されている場合)。

transformWithState カスタムメトリクス

transformWithState(TWS)の動作と操作について収集された情報。transformWithStateの詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。

フィールド

説明

customMetrics.initialStateProcessingTimeMs

すべての初期状態を処理するのにかかったミリ秒数。

customMetrics.numValueStateVars

値の状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numListStateVars

リスト状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numMapStateVars

マップ状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numDeletedStateVars

削除された状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.timerProcessingTimeMs

すべてのタイマーの処理にかかったミリ秒数

customMetrics.numRegisteredTimers

登録されたタイマーの数。transformWithStateInPandasにも存在します。

customMetrics.numDeletedTimers

削除されたタイマーの数。transformWithStateInPandasにも存在します。

customMetrics.numExpiredTimers

期限切れのタイマーの数。transformWithStateInPandasにも存在します。

customMetrics.numValueStateWithTTLVars

TTL を持つ値の状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numListStateWithTTLVars

TTL を持つリスト状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numMapStateWithTTLVars

TTL を持つマップ状態変数の数。transformWithStateInPandasにも存在します。

customMetrics.numValuesRemovedDueToTTLExpiry

TTL の有効期限が切れたために削除された値の数。transformWithStateInPandasにも存在します。

customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry

TTL の有効期限が切れたために段階的に削除された値の数。

ソース オブジェクト

オブジェクトタイプ : Array[SourceProgress]

sources オブジェクトには、ストリーミングデータソースの情報とメトリクスが含まれています。

フィールド

説明

description

ストリーミング データソース テーブルの詳細な説明。

startOffset

ストリーミング ジョブが開始されたデータソース テーブル内の開始オフセット番号。

endOffset

マイクロバッチによって処理された最後のオフセット。

latestOffset

マイクロバッチによって処理された最新のオフセット。

numInputRows

このソースから処理された入力ローの数。

inputRowsPerSecond

このソースから処理するためにデータが到着する速度 (秒単位)。

processedRowsPerSecond

Spark がこのソースからのデータを処理している速度。

metrics

タイプ: ju.Map[String, String]。特定のデータソースのカスタムメトリクスが含まれます。

Databricks には、次のソース オブジェクトの実装が用意されています。

注記

sources.<startOffset / endOffset / latestOffset>.* の形式 (または何らかのバリエーション) で定義されたフィールドの場合、それを (最大 3 つの) 可能なフィールドのいずれかとして解釈し、すべてに指定された子フィールドが含まれています。

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake ソース オブジェクト

Delta table ストリーミング データソースに使用されるカスタムメトリクスの定義。

フィールド

説明

sources.description

ストリーミング クエリの読み取り元となるソースの説明。 たとえば、 “DeltaSource[table]”のようになります。

sources.<startOffset / endOffset>.sourceVersion

このオフセットのエンコードに使用するシリアル化のバージョン。

sources.<startOffset / endOffset>.reservoirId

読み取られるテーブルの ID。 これは、クエリの再開時の設定ミスを検出するために使用されます。 Map Unity Catalog、Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子を参照してください。

sources.<startOffset / endOffset>.reservoirVersion

現在処理中のテーブルのバージョン。

sources.<startOffset / endOffset>.index

このバージョンの AddFiles のシーケンスのインデックス。 これは、大きなコミットを複数のバッチに分割するために使用されます。 このインデックスは、 modificationTimestamppathで並べ替えることによって作成されます。

sources.<startOffset / endOffset>.isStartingVersion

現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを識別します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが最初に処理され、次に新しいデータが到着します。

sources.<startOffset / endOffset / latestOffset>.eventTimeMillis

イベント時間の順序として記録されたイベント時間。処理が保留中の初期スナップショット データのイベント時刻。イベント時間順の初期スナップショットを処理するときに使用されます。

sources.latestOffset

マイクロバッチ クエリによって処理された最新のオフセット。

sources.numInputRows

このソースから処理された入力ローの数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理している速度。

sources.metrics.numBytesOutstanding

未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。

sources.metrics.numFilesOutstanding

処理すべき未処理ファイルの数。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。

Apache Kafka ソースオブジェクト

Apache Kafka ストリーミング データソースに使用されるカスタム メトリクスの定義。

フィールド

説明

sources.description

Kafka ソースの詳細な説明で、読み取られる正確な Kafka トピックを指定します。 たとえば、 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”のようになります。

sources.startOffset

ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。

sources.endOffset

マイクロバッチによって処理された最後のオフセット。 これは、進行中のマイクロバッチ実行の latestOffset と等しくなる可能性があります。

sources.latestOffset

マイクロバッチによって計算された最新のオフセット。マイクロバッチ処理では、調整がある場合にすべてのオフセットが処理されない可能性があり、その結果、 endOffsetlatestOffset が異なる場合があります。

sources.numInputRows

このソースから処理された入力ローの数。

sources.inputRowsPerSecond

このソースから処理のためにデータが到着する速度。

sources.processedRowsPerSecond

Spark がこのソースからのデータを処理している速度。

sources.metrics.avgOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で最新の使用可能なオフセットの背後にあるオフセットの平均数。

sources.metrics.estimatedTotalBytesBehindLatest

クエリ プロセスがサブスクライブされたトピックから消費しなかった推定バイト数。

sources.metrics.maxOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。

sources.metrics.minOffsetsBehindLatest

ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。

Auto Loader ソース メトリクス

Auto Loader ストリーミング データソースに使用されるカスタム メトリクスの定義。

フィールド

説明

sources.<startOffset / endOffset / latestOffset>.seqNum

処理中のファイルのシーケンス内の現在位置 (検出された順序で)。

sources.<startOffset / endOffset / latestOffset>.sourceVersion

cloudFiles ソースの実装バージョン。

sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs

最新のバックフィル操作の開始時刻。

sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs

最新のバックフィル操作の終了時刻。

sources.<startOffset / endOffset / latestOffset>.lastInputPath

ストリームが再開される前の、ユーザーが指定したストリームの最後の入力パス。

sources.metrics.numFilesOutstanding

バックログ内のファイルの数

sources.metrics.numBytesOutstanding

バックログ内のファイルのサイズ (バイト)

sources.metrics.approximateQueueSize

メッセージ・キューのおおよそのサイズ。cloudFiles.useNotifications オプションが有効な場合のみ。

PubSub ソースのメトリクス

PubSub ストリーミング データソースに使用されるカスタム メトリクスの定義。 モニタリング PubSub ストリーミング ソースの詳細については、「 モニタリング ストリーミング メトリクス」を参照してください。

フィールド

説明

sources.<startOffset / endOffset / latestOffset>.sourceVersion

このオフセットがエンコードされる実装バージョン。

sources.<startOffset / endOffset / latestOffset>.seqNum

処理中の永続化されたシーケンス番号。

sources.<startOffset / endOffset / latestOffset>.fetchEpoch

処理中の最大のフェッチ エポック。

sources.metrics.numRecordsReadyToProcess

現在のバックログで処理可能なレコードの数。

sources.metrics.sizeOfRecordsReadyToProcess

現在のバックログ内の未処理データの合計サイズ (バイト単位)。

sources.metrics.numDuplicatesSinceStreamStart

ストリームの開始以降にストリームによって処理された重複レコードの合計数。

Pulsar ソースのメトリクス

Pulsar ストリーミング データソースに使用されるカスタム メトリクスの定義。

フィールド

説明

sources.metrics.numInputRows

現在のマイクロバッチで処理された行の数。

sources.metrics.numInputBytes

現在のマイクロバッチで処理された合計バイト数。

sink オブジェクト

オブジェクトタイプ : SinkProgress

フィールド

説明

sink.description

シンクの説明。使用されている特定のシンクの実装について詳しく説明します。

sink.numOutputRows

出力ローの数。シンクの種類が異なれば、値の動作や制限も異なる場合があります。サポートされている特定のタイプを参照してください

sink.metrics

ju.Map[String, String] of sink メトリクス.

現在、Databricks では、次の 2 つの特定の DATA sink オブジェクト実装を提供しています。

シンクタイプ

詳細

Delta テーブル

Delta シンク オブジェクトを参照してください。

Apache Kafka トピック

Kafka シンクオブジェクトを参照してください。

sink.metrics フィールドは、sink オブジェクトの両方のバリアントで同じように動作します。

Delta Lake シンク オブジェクト

フィールド

説明

sink.description

Deltaシンクの説明。使用されている特定の Delta シンクの実装について詳しく説明します。たとえば、 “DeltaSink[table]”のようになります。

sink.numOutputRows

Spark は DSv1 シンク (Delta Lake シンクの分類) の出力行を推測できないため、行数は常に -1 です。

Apache Kafka シンク オブジェクト

フィールド

説明

sink.description

ストリーミング クエリが書き込んでいる Kafka シンクの説明 (使用されている特定の Kafka シンクの実装について詳しく説明します)。 たとえば、 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”のようになります。

sink.numOutputRows

マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値は "-1" になることがあり、通常は "不明" と解釈できます。

例 Kafka-to-Kafka StreamingQueryListener イベント

Python
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}

Delta Lake から Delta Lake への StreamingQueryListener イベントの例

Python
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}

キネシスからデルタレイクへのストリーミングクエリリスナーイベントの例

Python
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}

Kafka+Delta Lake 間の StreamingQueryListener イベントの例

Python
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}

Delta Lake StreamingQueryListener イベントへのレート ソースの例

Python
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}