メインコンテンツまでスキップ

イベントログで取り込みゲートウェイの進行状況を監視する

適用対象 :赤い×印のアイコン SaaSコネクタ緑色のチェックマークデータベースコネクタ

イベント ログを使用して、取り込みゲートウェイの進行状況をリアルタイムで監視する方法を学習します。イベント ログは、スナップショットとチェンジデータ キャプチャ ( CDC ) フェーズの両方のテーブルごとのメトリクスを提供し、パイプラインの健全性を追跡し、停止したパイプラインを特定し、自動モニタリング ソリューションを構築できるようにします。

進捗イベントでは次のことが可能になります。

  • パイプラインの完了を待たずに、テーブルごとに取り込まれた行数とバイト数を追跡します。
  • 各テーブルのスナップショットの進捗状況を監視し、大規模な初期ロードの完了を予測します。
  • テーブルごとのETAを使用して、長時間実行中のスナップショットがいつ完了するかを推定します。
  • テーブルごとにエンドツーエンドのCDC検出レイテンシー(ソースコミットからイベントログへの出力まで)を計測します。
  • 取り込まれた各テーブルを個別に監視して、ボトルネックや問題を特定します。
  • データの変更が発生していない場合でもイベントを受信し、パイプラインがアクティブに実行されていることを確認します。
  • ログを解析する代わりに、構造化されたイベント データを使用してアラートとダッシュボードを構築します。

進捗イベントの仕組み

ゲートウェイは、パイプライン内の各テーブルに対して、以下のイベントタイプを一定間隔(デフォルト: 5分)で出力します。

  • flow_progress イベントはスナップショットフローとCDCフローの行とバイトのカウンターを報告します。これらのイベントのメトリクスはデルタです。排出ごとにゼロにリセットされます。CDCフローの場合、これらのイベントには、エンドツーエンドの検出レイテンシーおよびアップロードパイプラインのパフォーマンスを測定するレイテンシー メトリクスも含まれます。
  • operation_progress イベントは、スナップショットの進捗を割合で報告します。スナップショットフローは、flow_progressに加えてこれらのイベントを出力します。進捗率は累積です。スナップショットの有効期間中に、0から100まで蓄積されます。これらのイベントには、スナップショットが完了するまでの残りの推定時間(estimated_completion_ms)も含まれます。

各イベントには以下が含まれます。

  • ソーステーブルと宛先テーブルの名前。
  • テーブルごとのメトリクス: 更新/挿入された行、削除された行 ( CDCのみ)、出力バイト数、および進行状況のパーセンテージ (スナップショットの場合)。
  • CDCフローでは、検出レイテンシーやバッチ処理時間などのレイテンシーメトリクスが含まれます。
  • スナップショットの完了までの推定残り時間です。
  • イベントが生成された日時。

イベントはイベント ログ テーブルで使用できますが、パブリックAPIs経由では使用できません。 SQLを使用してイベント ログ テーブルにクエリを実行し、パイプラインの動作を分析し、モニタリング ソリューションを構築できます。

アクセス進行イベント

進行状況イベントはイベント ログ テーブルに保存されます。アクセスするには:

  1. Databricks ワークスペースでゲートウェイに移動します。
  2. UI でイベントを表示するには、 「イベント ログ」 タブをクリックします。
  3. 詳細な分析を行うには、SQL を使用してイベント ログ テーブルを直接クエリします。

イベントログテーブルをクエリする

flow_progressイベントの行数とバイト数を照会するには:

SQL
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

スナップショットの進行状況パーセンテージを取得するために、 operation_progressイベントを照会するには:

SQL
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

<pipeline-id>ゲートウェイ ID に置き換えます。

イベントの構造を理解する

進行状況イベントでは、ログレベルMETRICSで以下のいずれかのイベントタイプを使用します。

  • flow_progressスナップショットフローとCDCフローの両方で発行されます。テーブルごとの行数とバイト数の差分を報告します。
  • operation_progressスナップショットフローでのみ出力されます。テーブルのスナップショット完了率を報告します。

以下の例は、各イベントタイプのJSON構造を示しています。

スナップショットフローの進行状況イベント構造

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}

CDCフロー進行イベント構造

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
},
"streaming_metrics": {
"discovery_latency_ms": 12450,
"batch_processing_time_ms": 8100,
"event_time": {
"max": "2025-10-14T13:33:45.000Z"
}
}
}
},
"maturity_level": "STABLE"
}

スナップショット操作の進行状況イベント構造

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"estimated_completion_ms": 1885000,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}

イベントフィールド

次の表は、進行状況イベントの主要なフィールドについて説明しています。

フィールド

Type

説明

event_type

String

flow_progress (スナップショットとCDCの行数とバイト数のカウンター)またはoperation_progress (スナップショットの進行状況のパーセンテージ)。

level

String

常にMETRICS

timestamp

String

イベントが生成された際の ISO 8601 タイムスタンプ。

origin.pipeline_type

String

常にINGESTION_GATEWAY

origin.pipeline_name

String

ゲートウェイの名前。

origin.dataset_name

String

取り込まれるテーブルの名前。

origin.catalog_name

String

Unity Catalogカタログ名。

origin.schema_name

String

Unity Catalogスキーマ名。

origin.flow_name

String

取り込みフェーズを示すフロー識別子。形式: 初期ロードの場合は{catalog}.{schema}.{table}_snapshot_flow 、増分変更の場合は{catalog}.{schema}.{table}_cdc_flow

origin.ingestion_source_type

String

ソース データベースの種類 (例: SQLSERVERMYSQLPOSTGRESQLORACLE )。

details:flow_progress.status

String

現在のフローのステータス。通常はRUNNING

details:flow_progress.metrics.num_upserted_rows

Integer

前回のイベント以降に挿入または更新された行数。Deltaメトリクス。 放出ごとにゼロにリセットされます。

details:flow_progress.metrics.num_deleted_rows

Integer

前回のイベント以降に削除された行数。Deltaメトリクス。 放出ごとにゼロにリセットされます。スナップショットフローの場合はnull (スナップショットは削除しません)。

details:flow_progress.metrics.num_output_bytes

Integer

前回のイベント以降にボリュームにアップロードされた圧縮バイト数。Deltaメトリクス。 放出ごとにゼロにリセットされます。スナップショットフローとCDCフローの両方に対応しています。

details:flow_progress.streaming_metrics.discovery_latency_ms

Integer

event_time.maxにおけるソースの変化からこのイベントが発信されるまでの時間(ミリ秒)。CDCフローのみです。ソースが変更タイムスタンプを公開していない場合、「null」になります。

details:flow_progress.streaming_metrics.batch_processing_time_ms

Integer

ゲートウェイが最新の変更バッチの読み取りとアップロードに費やしたミリ秒。ソース側の遅延は含まれません。CDCフローのみです。最初のバッチが完了するまで、null になります。

details:flow_progress.streaming_metrics.event_time.max

String

このテーブルについてゲートウェイが読み取った最新のソース変更のISO 8601タイムスタンプです。CDCフローのみです。

details:operation_progress.type

String

操作の種類。スナップショット操作の場合はCDC_SNAPSHOT

details:operation_progress.status

String

現在の稼働状況。スナップショットの実行中はIN_PROGRESS 、完了するとCOMPLETEDなります。その他の値には、 STARTEDCANCELEDFAILEDが含まれます。

details:operation_progress.duration_ms

Integer

操作にかかった合計時間(ミリ秒)。

details:operation_progress.progress_percent

Double

スナップショット完了率( 0.0 - 100.0 )。累積値であり、差分値ではない。チャンクの処理が完了するにつれて値が増加し、スナップショットが完了すると100.0に達します。

details:operation_progress.estimated_completion_ms

Integer

スナップショット完了までの推定残り時間 (ミリ秒)スナップショットの進行に伴い減少し、完了時に0に達します。進捗が遅れる場合、一時的に増加する可能性があります。推定値を生成するのに十分なデータが処理されるまでは、 nullなる可能性があります。

details:operation_progress.cdc_snapshot.target_table_name

String

スナップショットを作成するテーブルの完全修飾名。

maturity_level

String

常にSTABLE

メトリクスの動作

Progress Metrix は次のカテゴリに分類されます。

Deltaメトリクス ( num_upserted_rowsnum_deleted_rowsnum_output_bytes ):

  • 前回のイベント以降の変動を表し、累積合計を表すものではありません。
  • 各イベント発生後にゼロにリセットしてください。
  • データ変更が発生していない場合でも発信され、活性指標として機能します。
  • スナップショットフローの場合、スナップショットでは削除が行われないため、 num_deleted_rowsnullになります。

累積メトリクス ( progress_percent ):

  • スナップショットの有効期間中に、値は0.0から100.0まで累積されます。
  • スナップショットが進行するにつれて更新されます。小さなテーブルの場合、値は0.0から100.0に直接ジャンプする可能性があります。大規模なテーブルの場合、値は徐々に更新され、正確な行数ではなく近似値です。

時点メトリクスdiscovery_latency_msbatch_processing_time_msevent_time.maxestimated_completion_ms ):

  • 各値は、イベントが発行された瞬間のメトリクスの状態を反映しています。
  • discovery_latency_msbatch_processing_time_ms はCDCフローにのみ適用されます。結果が負の値になる場合は、値は0として表示されます。
  • event_time.max CDCフローにのみ適用されます。値は、ゲートウェイが読み取った最新のソース変更のタイムスタンプです。
  • estimated_completion_ms スナップショットフローにのみ適用されます。スナップショットが進むにつれて値は減少し、完了時に0に達します。

進行イベントを構成する

新しいゲートウェイでは、進行状況イベントがデフォルトで有効になっています。パイプライン設定を使用してイベントの動作をカスタマイズできます。

進行状況イベントを有効または無効にする

JSON
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}

進行状況イベントを無効にするには、 "false"に設定します。

イベントの発信頻度を調整する

JSON
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

デフォルト:300秒(5分)。有効範囲:30秒~3600秒(30秒~1時間)。この設定は、 flow_progressoperation_progress両方のイベントの発生頻度を制御します。

ゲートウェイ構成の例

次の例は、進行状況イベントが有効になっていて、5 分ごとに発行するように設定された完全なゲートウェイ構成を示しています。

Python
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}

重要な動作と制限

デフォルトの動作

  • この機能は、すべての新しいゲートウェイに対してデフォルトで有効になっています。
  • 既存のパイプラインは、次回の更新または再起動時にこの機能を自動的に受け取ります。
  • 進行状況イベントを有効にするのに必要なアクションはありません。

ゲートウェイバージョン間のメトリクス利用可能性

スナップショットETA(estimated_completion_ms)およびCDCレイテンシメトリクス(streaming_metrics)には、2026年5月以降の取り込みゲートウェイリリースからのゲートウェイイメージが必要です。Databricksはゲートウェイイメージを自動的に選択します。手動で設定することはできません。この機能は2026年5月にすべての本番運用リージョンに展開されました。

新規作成されたパイプラインと既存のパイプラインはどちらも新しいイメージを受け取ります。既存のパイプラインは、次回の更新または再起動時に自動的に適用されるため、移行する必要はありません。

パイプラインがestimated_completion_msおよびstreaming_metricsのフィールドを出力するかどうかを確認するには、次のクエリを実行してください。両方の列が返された場合、ゲートウェイはスナップショットETAおよびCDCレイテンシのメトリクスをサポートしています。

SQL
SELECT
MAX(CASE WHEN details:operation_progress:estimated_completion_ms IS NOT NULL
THEN timestamp END) AS last_snapshot_eta,
MAX(CASE WHEN details:flow_progress:streaming_metrics IS NOT NULL
THEN timestamp END) AS last_cdc_latency
FROM event_log('<pipeline-id>')
WHERE event_type IN ('operation_progress', 'flow_progress')
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR

パイプラインの再起動後、完全な排出間隔(デフォルト:5分)が経過してもどちらの列にも最新のタイムスタンプがない場合は、Databricksサポートに連絡して、お住まいの地域でこれらの機能が有効になっているかどうかを確認してください。

タイミングの考慮

  • 最初のエミッションには、パイプラインの開始後、進行イベントが表示されるまで、構成された頻度間隔 (デフォルト: 5 分) までかかる場合があります。
  • アクティブな取り込み中は、設定された頻度でイベントが発行されます。

ゼロ更新メトリクス

  • 更新がゼロのテーブルも含め、すべてのテーブルに対してイベントが発行されます。

  • ゼロ更新メトリクスは、次のことを区別するのに役立ちます。

    • アイドル テーブル: 処理されましたが、データの変更は発生しませんでした。
    • 未処理のテーブル: パイプラインによってまだ取得されていません。
  • ゼロ更新イベントは、パイプラインがアクティブに実行されていることを確認する活性信号として機能します。

スナップショットの進捗率の動作

  • スナップショットの進行状況は(completed_chunks / total_chunks) × 100として計算されます。メトリクスはおおよその値であり、正確な行レベルのパーセンテージではありません。
  • 複数のチャンクに分割されていないテーブル(通常はより小さなテーブル)は、追跡するチャンクが1つしかないため、発行間で0.0から100.0に直接ジャンプします。
  • 多数のチャンクに分割された大きなテーブルは、各チャンクが完了するにつれて段階的に更新され、徐々に進行状況を示すシグナルを提供するため、 長時間かかる初期ロードに役立ちます。
  • COMPLETEDステータスは常にprogress_percent = 100.0を報告します。
  • メトリクスは、パイプラインの更新または再起動後には存続しません。 再起動後、スナップショットの進行状況は最後にコミットされたチェックポイントから再開され、メトリクスは再開された位置から上昇を続けます。

サンプルクエリ

以下のサンプルクエリは、行数、出力バイト数、スナップショットの進捗状況とETA、およびCDCレイテンシーメトリクスを使用してゲートウェイを監視する方法を示しています。各クエリで、<pipeline-id>をゲートウェイIDに置き換えます。

ヒント

取り込みゲートウェイ プログレスモニターノートブックには、このセクションのすべてのクエリーが含まれています。ゲートウェイが実行されているワークスペースにノートブックをインポートし、ゲートウェイ ID を指定してください。スナップショットの進捗、CDCレイテンシー、および行数を検査するためにノートブックを実行してください。モニタリング要件に合わせてSLAしきい値を調整することもできます。

行数クエリ

テーブルごとの売上高(過去24時間)

過去24時間における各テーブルのアップサートと削除の合計数。取り込みフェーズはフロー名の接尾辞から分類されます。これをダッシュボードの見出しとして使用すれば、どのテーブルが最も多くのデータを移動したかを確認できます。

SQL
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC

最近の進捗状況(過去1時間)

パイプライン内のすべてのテーブルの最新行数カウンター。短期モニタリングに役立ちます。

SQL
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

サイレントテーブルまたはスタックテーブルを特定する

イベントを発信しているにもかかわらず、過去60分間にアップサートと削除がゼロであると報告しているテーブルは、「スタック」状態になる可能性が高い。番号を変更する必要があるかどうか、さらに調査してください。 CDCの表のうち、実際に利用されていないもの(例えば、夜間など)もここに表示されます。

SQL
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC

テーブルごとのタイムラインと累計値

過去24時間における、単一のフローに関するイベントごとの完全な履歴と、累積行数。<flow-pattern> SQL LIKEパターン (例: '%customers%_cdc_flow' ) に置き換えます。

SQL
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp

出力バイトクエリ

テーブルあたりのバイト数(過去24時間)

過去24時間にテーブルごとにボリュームにアップロードされた合計バイト数(MBおよびGB単位)。最もデータ量の多いテーブルを表示するには、降順で並べ替えてください。

SQL
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC

スループットの傾向(MB/分)

過去24時間における全フローでのアップロードバイト数の、1分ごとの時系列データ。スループットのパターンや停滞箇所を特定するために、折れ線グラフとして表示します。

SQL
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute

行あたりの平均バイト数(ワイドテーブルまたはLOB検出器)

num_output_bytesと行数を結合して、テーブルごとの行ごとの平均バイト数を計算します。 高い値は通常、スループットコストを押し上げるLOBテーブルまたはワイドスキーマテーブルを示しています。キャパシティプランニングやスキーマレビューに役立ちます。

SQL
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC

スナップショットの進捗状況に関するクエリ

全体的なスナップショットの進捗状況

次のクエリは、完了、進行中、またはキューにあるテーブルの数の単一行の概要を返します。結果には、イベントログの保持期間内におけるすべてのパイプライン更新を通じて、テーブルごとに報告された最新の状態が反映されます。そのため、以前の更新で完了したテーブルは、更新または再起動後も引き続きtables_completedにカウントされます。

SQL
WITH latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1

テーブルごとのスナップショットステータスボード

以下のクエリは、パイプライン内の各テーブルと、その最新の報告ステータスおよび進捗率を返します。結果には、以前の更新で完了したテーブルが含まれます。そのため、欠落している行として表示されるテーブルはありません。

SQL
WITH table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC

現在の更新で読み込まれたスナップショットの行数とバイト数

次のクエリは、現在のスナップショット実行における各テーブルの進捗率、アップサートされた行、およびアップロードされたバイト数を組み合わせています。

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC

停止したスナップショットの検出

次のクエリは、過去30分間にprogress_percentが変更されていないスナップショットテーブルを返します。停止しているが、まだアクティブであるスナップショットを特定するために使用します。

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC

テーブルごとのスナップショット完了予定時刻

以下のクエリは、アクティブなスナップショット実行における各テーブルの現在の進捗率および完了までの推定時間を返します。IN_PROGRESS のステータスのテーブルのみが返されます。完了済みおよびキューにあるテーブルを含めるには、status = 'IN_PROGRESS'フィルターを削除します。

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_flow AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:estimated_completion_ms::bigint AS eta_ms,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
eta_ms,
ROUND(eta_ms / 1000.0 / 60.0, 1) AS eta_minutes,
ROUND(eta_ms / 1000.0 / 3600.0, 2) AS eta_hours,
timestamp AS last_update
FROM latest_per_flow
WHERE rn = 1
AND status = 'IN_PROGRESS'
ORDER BY eta_ms DESC NULLS LAST

CDC レイテンシークエリ

CDCのテーブルごとの最新性

ゲートウェイは、各CDCフローについて、streaming_metrics内に複数の可観測性フィールドを報告します。テーブルが最新であるかどうか、どのテーブルが遅延しているか、そして遅延がソース側かゲートウェイ側かを確認できます。

これらは取り込みゲートウェイのレイテンシーです。ソースデータベースからゲートウェイを経由してUnity Catalogボリュームまでのパスを測定します。Unity Catalog ボリュームから宛先テーブルへのダウンストリームアプライヤーのレイテンシーは含まれていません。そのステージはアプライヤーのイベントログで個別に観測されます。

フィールド

説明

event_time.max

このテーブルのソースデータベースからゲートウェイが読み取った最新の変更のISO 8601タイムスタンプ。この値がイベント間で変化しなくなった場合、ソースは変更を生成しなくなりました、またはゲートウェイがソースからソースから読み取れなくなります。

discovery_latency_ms

event_time.maxでのソース変更から、ゲートウェイがこのイベントを発行した時点までのミリ秒。ソースデータベースからUnity Catalogボリュームまでの完全なパスを網羅します。より低い値は、ボリューム内のデータがより新しいことを意味します。

batch_processing_time_ms

ゲートウェイが最新の変更バッチの読み取りとアップロードに費やしたミリ秒。ソース データベース側での遅延は含まれません。ソースデータベースの遅延を推定するには、discovery_latency_msからこの値を減算します。

discovery_latency_msbatch_processing_time_ms の関係から、ゲートウェイパス (ソースデータベースから Unity Catalog ボリュームまで) のどこでレイテンシが集中しているかを確認できます。

パターン

意味

両方のスモール

CDC は実行中で、最新の状態です。

discovery_latency_ms 高い、batch_processing_time_ms 低い

ソース側の遅延。ゲートウェイは速やかに読んでいます。ソースからコミットが遅延しました(レプリケーションの遅延、ソースログのバックログ、実行時間の長いソーストランザクション)。

両方とも高い

ゲートウェイ側の遅延。アップロードパイプラインはボトルネックです。ゲートウェイコンピュートリソースおよびUnity Catalogボリュームへのネットワークパスを確認してください。

discovery_latency_ms 1つのテーブルのみに限定されます。

テーブルごとのソースの問題(DDL処理中、ロック競合、ブロックされたレプリケーションスロット、スキーマ変更)

discovery_latency_ms 各テーブルで増分されます

ゲートウェイ全体での問題(リソース、ボリューム接続、ソース CDC ログバックログ、すべてのキャプチャに影響)

event_time.max 複数の排出物にわたって進んでいない

ソースはアイドル状態です、またはゲートウェイがソースデータベースへの接続を失いました。ソース CDC の有効化とゲートウェイログを確認してください。

次のクエリは、各CDCテーブルの過去30分間のCDCフレッシュネスイベントを返します。デフォルトの5分間隔で、これは1テーブルあたり約6行になります。結果はdiscovery_latency_msで並べ替えられるため、最も遅延しているテーブルが最初に表示されます。

SQL
SELECT
origin.flow_name AS flow_name,
details:flow_progress:streaming_metrics:event_time:max::string AS latest_source_commit_seen,
details:flow_progress:streaming_metrics:discovery_latency_ms::bigint AS discovery_latency_ms,
details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint AS batch_processing_time_ms,
timestamp AS emission_ts
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
ORDER BY discovery_latency_ms DESC NULLS LAST, flow_name, emission_ts DESC

CDC レイテンシ時系列

以下のクエリは、直近24時間における各CDCフローのdiscovery_latency_msbatch_processing_time_msの時間平均を返します。鮮度が低下した時期を特定するためにご利用ください。多数のCDCフローを含むパイプラインでは、結果セットを絞り込むには、フロー名でフィルタリングしてください。

SQL
SELECT
DATE_TRUNC('HOUR', timestamp) AS ts_hour,
origin.flow_name AS flow_name,
ROUND(AVG(details:flow_progress:streaming_metrics:discovery_latency_ms::bigint) / 1000.0, 2) AS avg_discovery_latency_sec,
ROUND(AVG(details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint) / 1000.0, 2) AS avg_batch_processing_sec
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('HOUR', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_hour

トラブルシューティング

進行イベントが表示されない

イベント ログに進行状況イベントが表示されない場合は、次の手順を実行します。

  1. pipelines.gateway.progressEventsEnabled"true"に設定されていることを確認してください。
  2. パイプラインの開始後、少なくとも 1 つの完全な間隔を待機します。デフォルトは 5 分です。
  3. パイプラインがアクティブに実行され、取り込まれていることを確認します。
  4. 進行状況イベントのみを表示するには、 level = 'METRICS'フィルターを含めます。

イベントが頻繁に発生したり、まれにしか発生しない

イベントが予想される頻度で発生しない場合は、次の手順に従ってください。

pipelines.gateway.progressEventEmitFrequencySeconds設定を確認し、必要に応じて調整します。

  • デフォルトは5分(300秒)です。
  • 有効な範囲: 30 ~ 3600 秒。必要に応じて調整してください。

パイプラインの再起動後にメトリクスがゼロになる

パイプラインの再起動後にメトリクスがゼロにリセットされた場合:

メトリクスはメモリ内のみであり、再起動、更新、または再開時にリセットされます。 これは実装を簡素化するための意図的なものです。パイプラインはすぐに新しいメトリクスの蓄積を開始します。

一部のテーブルにメトリクスがありません

一部のテーブルに進行状況イベントが表示されない場合:

  1. パイプライン構成でテーブルがフィルター処理されていないことを確認してください。
  2. CDC フェーズでは、ソース テーブルで CDC または変更追跡が有効になっていることを確認します。
  3. テーブルがゲートウェイ構成に含まれていることを確認します。
  4. スナップショットフローの場合、 progress_percentoperation_progressイベントでのみ出力されることに注意してください。CDC フローはoperation_progressイベントを発信しません。なぜなら、CDC には完了の概念がないからです。

estimated_completion_ms または streaming_metrics フィールドがありません

event_log 行が存在するにもかかわらず、estimated_completion_ms または streaming_metrics オブジェクトが見つからない場合は、「ゲートウェイバージョン間のメトリックの可用性」で診断クエリと最小ゲートウェイ要件を確認してください。

その他のリソース