モニタリング 構造化ストリーミング クエリ on Databricks
は、 ストリーミング タブの下のDatabricks を通じて、構造化ストリーミング アプリケーションの組み込み モニタリングを提供します。Spark UI
Spark UI での構造化ストリーミング クエリの区別
writeStream コードに .queryName(<query-name>) を追加してストリームに一意のクエリ名を付けると、 Spark UI内のどのメトリクスがどのストリームに属しているかを簡単に区別できます。
Push 構造化ストリーミング メトリクスを外部サービス
ストリーミングメトリクスは、Apache Spark のストリーミングクエリリスナーインターフェースを使用して、アラートやダッシュボードのユースケースのために外部サービスにプッシュできます。Databricks Runtime 11.3 LTS 以降では、 StreamingQueryListener を Python と Scala で使用できます。
Unity Catalog 対応のコンピュート アクセス モードを使用するワークロードには、次の制限が適用されます。
StreamingQueryListenerDatabricks Runtime 15.1 以降で、認証情報を使用したり、コンピュートの Unity Catalog によって管理されるオブジェクトと対話したりするには、専用アクセスモードが必要です。StreamingQueryListener標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードには、Databricks Runtime 16.1 以降が必要です。
リスナーによる処理の遅延は、クエリの処理速度に大きな影響を与える可能性があります。これらのリスナーの処理ロジックを制限し、効率のために Kafka などの高速応答システムに書き込むことを選択することをお勧めします。
クエリにソースで使用可能なデータがなく、新しいデータを待機している場合は、 onQueryIdle メッセージがストリーミング クエリ リスナーに配信されます。onQueryProgress メッセージは、ストリーミング クエリ バッチの終了時にのみ配信されます。クエリが長時間データを処理している場合、 onQueryIdle イベントも onQueryProgress イベントも送信されない可能性がありますが、クエリは正常でデータの処理を続行します。
次のコードは、リスナーを実装するための構文の基本的な例を示しています。
- Scala
- Python
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
構造化ストリーミングでの観測可能なメトリクスの定義
観測可能なメトリクスは、クエリ (データフレーム) で定義できる任意の集計関数と呼ばれます。 データフレームの実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリクスを含む名前付きイベントが発行されます。
これらのメトリクスは、 Spark セッションにリスナーをアタッチすることで観察できます。 リスナーは実行モードによって異なります。
-
バッチモード :
QueryExecutionListenerを使用します。QueryExecutionListenerは、クエリの完了時に呼び出されます。QueryExecution.observedMetricsマップを使用してメトリクスにアクセスします。 -
ストリーミング、またはマイクロバッチ :
StreamingQueryListenerを使用します。StreamingQueryListenerは、ストリーミング クエリがエポックを完了したときに呼び出されます。StreamingQueryProgress.observedMetricsマップを使用してメトリクスにアクセスします。Databricks は、ストリーミングのcontinuousトリガー モードをサポートしていません。
例えば:
- Scala
- Python
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Map Unity Catalog、 Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子
構造化ストリーミングのメトリクスでは、ストリーミングクエリのソースとして使用されるDelta Lakeテーブルの一意の識別子として、複数の箇所でreservoirIdフィールドを使用します。
reservoirId フィールドは、Delta Lake テーブルによって Delta トランザクションログに保存されている一意の識別子をマッピングします。tableIdこのIDは、Unity Catalogによって割り当てられ、カタログエクスプローラに表示される の値にマッピングされません。
Delta Lakeテーブルのテーブル識別子を確認するには、次の構文を使用します。これは、Unity Catalog マネージドテーブル、Unity Catalog 外部テーブル、およびすべてのHive metastore Delta Lake テーブルに適用されます:
DESCRIBE DETAIL <table-name>
結果に表示される id フィールドは、ストリーミング メトリクスの reservoirId にマッピングされる識別子です。
StreamingQueryListener オブジェクトメトリクス
フィールド | 説明 |
|---|---|
| 再起動後も保持される一意のクエリ ID。 |
| すべての開始/再起動で一意のクエリ ID。StreamingQuery.runId() を参照してください。 |
| クエリのユーザー指定の名前。 名前が指定されていない場合、名前は null です。 |
| マイクロバッチの実行タイムスタンプ。 |
| 処理中のデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行される可能性があります。 同様に、処理するデータがない場合、バッチ ID は増分されません。 |
| バッチ操作の処理時間 (ミリ秒単位)。 |
| トリガーで処理されたレコードの合計 (すべてのソースにわたる) 数。 |
| 到着データの集計 (すべてのソースにわたる) 率。 |
| Spark がデータを処理している集計 (すべてのソースにわたる) レート。 |
StreamingQueryListener また、顧客メトリクスとソースの進行状況の詳細を調べることができるオブジェクトを含む次のフィールドも定義します。
フィールド | 説明 |
|---|---|
| タイプ: |
| タイプ: |
| タイプ: |
| タイプ: |
| タイプ: |
| タイプ: |
durationMs オブジェクト
オブジェクトタイプ : ju.Map[String, JLong]
マイクロバッチ実行プロセスのさまざまな段階を完了するのにかかる時間に関する情報。
フィールド | 説明 |
|---|---|
| マイクロバッチの実行にかかる時間。これには、Sparkがマイクロバッチの計画に要する時間は含まれません。 |
| ソースからオフセットに関するメタデータを取得するのにかかる時間。 |
| マイクロバッチで消費された最新のオフセット。この進捗オブジェクトは、ソースから最新のオフセットを取得するのに要した時間を示します。 |
| 実行プランの生成にかかった時間。 |
| マイクロバッチの計画と実行にかかる時間。 |
| 新しく使用可能なオフセットをコミットするのにかかった時間。 |
|
|
| バッチをコミット ログにコミットするのにかかった時間。 |
eventTime オブジェクト
オブジェクトタイプ : ju.Map[String, String]
マイクロバッチで処理されているデータ内で検出されたイベント時間値に関する情報。このデータは、ウォーターマークによって、構造化ストリーミングジョブで定義されたステートフル集計を処理するために、状態をどのようにトリミングするかを判断するために使用されます。
フィールド | 説明 |
|---|---|
| そのトリガーで確認された平均イベント時間。 |
| そのトリガーで表示される最大イベント時間。 |
| そのトリガーで表示される最小イベント時間。 |
| そのトリガーで使用されるウォーターマークの値。 |
stateOperators オブジェクト
オブジェクトタイプ : Array[StateOperatorProgress]
stateOperators オブジェクトには、構造化ストリーミング ジョブで定義されているステートフル操作と、それらから生成される集計に関する情報が含まれています。
ストリーム状態演算子の詳細については、「 ステートフル ストリーミングとは」を参照してください。
フィールド | 説明 |
|---|---|
| メトリクスが関連するステートフル演算子の名前 ( |
| ステートフルな演算子または集計の結果としての状態の行の合計数。 |
| ステートフルな演算子または集計の結果として state で更新された行の合計数。 |
| このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。 |
| ステートフルな演算子または集計の結果として state から削除された行の合計数。 |
| このメトリクスは現在、 Spark で測定できず、今後の更新で削除される予定です。 |
| すべての更新 (put と remove) をコミットし、新しいバージョンを返すのにかかる時間。 |
| 状態ストアが使用するメモリです。 |
| ステートフル集計に含めるには遅すぎると見なされる行の数。 ストリーミング集計のみ : 集計後に削除された行の数 (未加工の入力行ではない)。 この数値は正確ではありませんが、遅延データがドロップされていることを示しています。 |
| このステートフル オペレーターのシャッフル パーティションの数。 |
| オペレーターが初期化して保守した実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じです。 ただし、ストリーム-ストリーム結合は、パーティションごとに 4 つの状態ストア インスタンスを初期化します。 |
| 詳細については、このトピックの stateOperators.customMetrics を参照してください。 |
StateOperatorProgress.customMetrics オブジェクト
オブジェクトタイプ : ju.Map[String, JLong]
StateOperatorProgress には、 customMetricsというフィールドがあり、これには、メトリクスを収集するときに使用している機能に固有のメトリクスが含まれています。
機能 | 説明 |
|---|---|
RocksDB 状態ストア | |
HDFS 状態ストア | |
ストリーム重複排除 | |
ストリーム集約 | |
ストリーム結合演算子 | |
|
RocksDB 状態ストア custom メトリクス
RocksDB Capturing メトリクスから収集された、構造化ストリーミング ジョブに対して保持するステートフルな値に関するパフォーマンスと操作に関する情報。詳細については、「RocksDBでの 状態ストアの構成Databricks 」を参照してください。
フィールド | 説明 |
|---|---|
| RocksDB ファイル マネージャーによって追跡されたコピーされたバイト数。 |
| ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。 |
| チェックポイントのコミット中の圧縮時間 (ミリ秒単位) (オプション)。 |
| コミット中の圧縮時間 (ミリ秒単位)。 |
| ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。 |
| RocksDB インメモリをフラッシュする時間 (ミリ秒単位) がローカル ディスクに変わります。 |
| チェックポイント・コミットの一部としてバックグラウンド・ワーカー・スレッドを停止している時間 (ミリ秒単位) (コンパクションなど)。 |
| ステージングされた書き込みをインメモリ構造 ( |
| RocksDB ファイル マネージャーによって追跡されたコピーされたファイルの数。 |
| RocksDB ファイル マネージャーによって追跡された再利用されたファイルの数。 |
|
|
| 基になるネイティブ |
| RocksDB のブロック キャッシュからのキャッシュ ヒット数。 |
| RocksDBでのブロックキャッシュミスの数。 |
| RocksDB インスタンス内のすべての静的ソートテーブル (SST) ファイルのサイズ。 |
|
|
|
|
| イテレータを使用して読み取られた非圧縮データの合計バイト数。一部のステートフル操作 ( |
| コンパクション・プロセスがディスクから読み取るバイト数。 |
| 圧縮プロセスがディスクに書き込む合計バイト数。 |
| RocksDB コンパクションの時間 (ミリ秒単位) (バックグラウンドコンパクションとコミット中に開始されたオプションのコンパクションを含む)。 |
| バックグラウンド フラッシュを含む合計フラッシュ時間。フラッシュ操作は、 |
| ファイルマネージャーによって報告される非圧縮zipファイルのサイズ(バイト単位)。 ファイルマネージャーは、SSTファイルの物理ディスク容量の使用と削除を管理します。 |
| チェックポイントの場所に保存されている最新バージョンの RocksDB スナップショット。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリクスは特定のパーティション ID と状態ストア名に適用されます。 |
| put call の合計レイテンシ。 |
| put 呼び出しの数。 |
| ライターは、コンパクションまたはフラッシュが完了するまでの待機時間です。 |
| flush によって書き込まれた合計バイト数 |
| ピン留めブロックのメモリ使用量 |
| 内部カラム・ファミリの内部キーの数 |
| 外部列ファミリの数 |
| 内部列ファミリの数 |
HDFS 状態ストア custom メトリクス
HDFS状態ストア プロバイダーの動作と操作について収集された情報。
フィールド | 説明 |
|---|---|
| 現在のバージョンのみの状態の推定サイズ。 |
| プロバイダーにキャッシュされた状態でヒットしたキャッシュの数。 |
| プロバイダーにキャッシュされた状態のキャッシュミスの数。 |
| 特定の状態ストア インスタンスのスナップショットの最後にアップロードされたバージョン。 |
重複排除のカスタムメトリクス
重複排除の動作と操作に関して収集された情報。
フィールド | 説明 |
|---|---|
| 削除された重複行の数。 |
| ステートの削除中に読み取られたステートの行の数。 |
集計カスタムメトリクス
集計の動作と操作に関して収集された情報。
フィールド | 説明 |
|---|---|
| ステートの削除中に読み取られたステートの行の数。 |
ストリーム結合カスタムメトリクス
ストリーム結合の動作と操作に関して収集された情報。
フィールド | 説明 |
|---|---|
| スキップされた |
transformWithState カスタムメトリクス
transformWithState(TWS)の動作と操作について収集された情報。transformWithStateの詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。
フィールド | 説明 |
|---|---|
| 初期状態の処理に要したミリ秒数。 |
| 値の状態変数の数。 |
| リスト状態変数の数。 |
| マップ状態変数の数。 |
| 削除された状態変数の数。 |
| すべてのタイマーを処理するのにかかったミリ秒数。 |
| 登録されたタイマーの数。 |
| 削除されたタイマーの数。 |
| 期限切れのタイマーの数。 |
| TTL を持つ値の状態変数の数。 |
| TTL を持つリスト状態変数の数。 |
| TTL を持つマップ状態変数の数。 |
| TTL の有効期限が切れたために削除された値の数。 |
| TTL の有効期限が切れたために段階的に削除された値の数。 |
ソース オブジェクト
オブジェクトタイプ : Array[SourceProgress]
sources オブジェクトには、ストリーミングデータソースの情報とメトリクスが含まれています。
フィールド | 説明 |
|---|---|
| ストリーミング データソース テーブルの詳細な説明。 |
| ストリーミング ジョブが開始されたデータソース テーブル内の開始オフセット番号。 |
| マイクロバッチによって処理された最後のオフセット。 |
| マイクロバッチによって処理された最新のオフセット。 |
| このソースから処理された入力ローの数。 |
| このソースから処理するためにデータが到着する速度 (秒単位)。 |
| Spark がこのソースからのデータを処理している速度。 |
| タイプ: |
Databricks には、次のソース オブジェクトの実装が用意されています。
sources.<startOffset / endOffset / latestOffset>.* の形式 (または何らかのバリエーション) で定義されたフィールドの場合、それを (最大 3 つの) 可能なフィールドのいずれかとして解釈し、すべてに指定された子フィールドが含まれています。
sources.startOffset.<child-field>sources.endOffset.<child-field>sources.latestOffset.<child-field>
Delta Lake ソース オブジェクト
Delta Lakeテーブルのストリーミングデータソースに使用されるカスタムメトリクスの定義。
フィールド | 説明 |
|---|---|
| ストリーミング クエリの読み取り元となるソースの説明。 たとえば、 |
| このオフセットのエンコードに使用するシリアル化のバージョン。 |
| 読み取られるテーブルの ID。 これは、クエリの再開時の設定ミスを検出するために使用されます。 Map Unity Catalog、Delta Lake、および構造化ストリーミング メトリクス テーブルの識別子を参照してください。 |
| 現在処理中のテーブルのバージョン。 |
| このバージョンの |
| 現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを識別します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが最初に処理され、次に新しいデータが到着します。 |
| イベント時間の順序として記録されたイベント時間。処理が保留中の初期スナップショット データのイベント時刻。イベント時間順の初期スナップショットを処理するときに使用されます。 |
| マイクロバッチクエリによって処理された最新のオフセット。 |
| このソースから処理された入力ローの数。 |
| このソースから処理のためにデータが到着する速度。 |
| Spark がこのソースからのデータを処理している速度。 |
| 未処理のファイル(RocksDBによって追跡されるファイル)の合計サイズ。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。 |
| 処理すべき未処理ファイルの数。 これは、 Delta と Auto Loader as the ストリーミング ソースのバックログ メトリクスです。 |
Apache Kafka ソースオブジェクト
Apache Kafka ストリーミング データソースに使用されるカスタム メトリクスの定義。
フィールド | 説明 |
|---|---|
| Kafka ソースの詳細な説明で、読み取られる正確な Kafka トピックを指定します。 たとえば、 |
| ストリーミングジョブが開始された Kafka トピック内の開始オフセット番号。 |
| マイクロバッチによって処理された最後のオフセット。進行中のマイクロバッチ実行の場合、これは |
| マイクロバッチによって算出された最新のオフセット値。マイクロバッチ処理では、スロットリングが発生するとすべてのオフセットが処理されない場合があり、その結果、 |
| このソースから処理された入力ローの数。 |
| このソースから処理のためにデータが到着する速度。 |
| Spark がこのソースからのデータを処理している速度。 |
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で最新の使用可能なオフセットの背後にあるオフセットの平均数。 |
| クエリ プロセスがサブスクライブされたトピックから消費しなかった推定バイト数。 |
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最大数。 |
| ストリーミング クエリが、サブスクライブされたすべてのトピックの中で使用可能な最新のオフセットの背後にあるオフセットの最小数。 |
Databricks Runtime 17.1以降では、各マイクロバッチの完了後に最新のKafkaオフセットが取得されます。データを継続的に受信するトピックでは、バックログ メトリクスに小さく永続的なゼロ以外の値が表示される場合があります。 これは想定される動作であり、ストリームが遅延していることを示すものではありません。
Databricks Runtime 17.0以前のバージョンでは、最新のKafkaオフセットはマイクロバッチの開始時に取得されます。ストリーミングクエリがマイクロバッチの開始時に利用可能なすべてのレコードを一貫して消費する場合、バックログメトリクスは0返す可能性があります。
AWS Kinesis ソースオブジェクト
AWS Kinesis ストリーミングデータソースに使用されるカスタムメトリクスの定義。
フィールド | 説明 |
|---|---|
| Kinesis ソースの説明で、ストリーミングクエリが読み取っている正確な Kinesis ストリームを指定します。 たとえば、 |
| コンシューマがストリームの先頭から遅れた平均ミリ秒数。 |
| コンシューマがストリームの先頭から遅れた最大ミリ秒数。 |
| コンシューマがストリームの先頭から遅れた最小ミリ秒数。 |
| 処理に残っているバイト数。 これは、ソースとしての Kinesis のバックログメトリクスです。 |
| Kinesis ストリームの名前。 |
| Kinesis ストリームシャードの ID。 |
| 特定のバッチで消費された Kinesis シャード内のレコードの最初のシーケンス番号。 |
| 特定のバッチで Kinesis シャードから消費されたレコードの最後のシーケンス番号。 |
| Kinesis シャードが Kinesis ストリームによって閉じられている場合。 |
| ストリーミングクエリが Kinesis ストリームの最新データからどれだけ遅れているかを示すおおよその時間。 |
| 最後に消費されたレコードのシーケンス番号で、データ損失の検証に使用されます。 |
| ストリーミング クエリの実行に使用されるコンシューマー モード。 |
| このマイクロバッチで処理されたKinesisストリームの数。 |
| このマイクロバッチで処理されたシャードの総数。 |
| このマイクロバッチで処理されたクローズドシャードの数。 |
| このマイクロバッチで処理されたバイト数。 |
| このマイクロバッチで処理されたレコード数。 |
| EFO モードで使用される登録済みコンシューマーの数。 |
詳細については、 「Monitor Kinesis 」を参照してください。
Auto Loader ソース メトリクス
Auto Loader ストリーミング データソースに使用されるカスタム メトリクスの定義。
フィールド | 説明 |
|---|---|
| 処理中のファイルのシーケンス内の現在位置 (検出された順序で)。 |
| cloudFiles ソースの実装バージョン。 |
| 最新のバックフィル操作の開始時刻。 |
| 最新のバックフィル操作の終了時刻。 |
| ストリームが再開される前の、ユーザーが指定したストリームの最後の入力パス。 |
| バックログ内のファイルの数 |
| バックログ内のファイルのサイズ (バイト) |
| メッセージ・キューのおおよそのサイズ。cloudFiles.useNotifications オプションが有効な場合のみ。 |
| このソースから処理された入力ローの数。 |
PubSub ソースのメトリクス
PubSub ストリーミング データ ソースに使用されるカスタム メトリクスの定義。 Pub/Sub ストリーミング ソースのモニタリングの詳細については、「Pub/Sub ストリーミング メトリクスのモニタリング」を参照してください。
フィールド | 説明 |
|---|---|
| このオフセットがエンコードされる実装バージョン。 |
| 処理中の永続化されたシーケンス番号。 |
| 処理中の最大のフェッチ エポック。 |
| 現在のバックログで処理可能なレコードの数。 |
| 現在のバックログ内の未処理データの合計サイズ (バイト単位)。 |
| ストリームの開始以降にストリームによって処理された重複レコードの合計数。 |
Pulsar ソースのメトリクス
Pulsar ストリーミング データソースに使用されるカスタム メトリクスの定義。
フィールド | 説明 |
|---|---|
| 現在のマイクロバッチで処理された行の数。 |
| 現在のマイクロバッチで処理された合計バイト数。 |
sink オブジェクト
オブジェクトタイプ : SinkProgress
フィールド | 説明 |
|---|---|
| シンクの説明。使用されている特定のシンクの実装について詳しく説明します。 |
| 出力ローの数。シンクの種類が異なれば、値の動作や制限も異なる場合があります。サポートされている特定のタイプを参照してください |
|
|
現在、Databricks では、次の 2 つの特定の DATA sink オブジェクト実装を提供しています。
シンクタイプ | 詳細 |
|---|---|
Delta Lake テーブル | Delta シンク オブジェクトを参照してください。 |
Apache Kafka トピック | Kafka シンクオブジェクトを参照してください。 |
sink.metrics フィールドは、sink オブジェクトの両方のバリアントで同じように動作します。
Delta Lake シンク オブジェクト
フィールド | 説明 |
|---|---|
| Deltaシンクの説明。使用されている特定の Delta シンクの実装について詳しく説明します。たとえば、 |
| Spark は DSv1 シンク (Delta Lake シンクの分類) の出力行を推測できないため、行数は常に |
Apache Kafka シンク オブジェクト
フィールド | 説明 |
|---|---|
| ストリーミング クエリが書き込んでいる Kafka シンクの説明 (使用されている特定の Kafka シンクの実装について詳しく説明します)。 たとえば、 |
| マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行数。状況によっては、この値は「-1」になることがあり、一般的には「不明」と解釈されます。 |
例
例 Kafka-to-Kafka StreamingQueryListener イベント
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Delta Lake から Delta Lake への StreamingQueryListener イベントの例
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/<user>/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
キネシスからデルタレイクへのストリーミングクエリリスナーイベントの例
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/<run-id>/deltaTable]",
"numOutputRows" : -1
}
}
Kafka+Delta Lake 間の StreamingQueryListener イベントの例
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener イベントへのレート ソースの例
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/<user>.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}