イベントログで取り込みゲートウェイの進行状況を監視する
適用対象 : SaaSコネクタ
データベースコネクタ
イベント ログを使用して、取り込みゲートウェイの進行状況をリアルタイムで監視する方法を学習します。イベント ログは、スナップショットとチェンジデータ キャプチャ ( CDC ) フェーズの両方のテーブルごとのメトリクスを提供し、パイプラインの健全性を追跡し、停止したパイプラインを特定し、自動モニタリング ソリューションを構築できるようにします。
進捗イベントでは次のことが可能になります。
- パイプラインの完了を待たずに、テーブルごとに取り込まれた行数とバイト数を追跡します。
- 各テーブルのスナップショットの進捗状況を監視し、大規模な初期ロードの完了を予測します。
- 取り込まれた各テーブルを個別に監視して、ボトルネックや問題を特定します。
- データの変更が発生していない場合でもイベントを受信し、パイプラインがアクティブに実行されていることを確認します。
- ログを解析する代わりに、構造化されたイベント データを使用してアラートとダッシュボードを構築します。
進捗イベントの仕組み
ゲートウェイは、パイプライン内の各テーブルに対して、以下のイベントタイプを一定間隔(デフォルト: 5分)で出力します。
flow_progressイベントは、スナップショットおよび CDC フローの行数とバイト数を報告します。これらのイベントのメトリクスはデルタです。 それらは放出ごとにゼロにリセットされる。operation_progressイベントがスナップショットの進行状況をパーセンテージで報告します。スナップショットフローは、flow_progressに加えてこれらのイベントを発信します。進捗率は累積値です。スナップショットの有効期間中に、0から100まで蓄積されます。
各イベントには以下が含まれます。
- ソーステーブルと宛先テーブルの名前。
- テーブルごとのメトリクス: 更新/挿入された行、削除された行 ( CDCのみ)、出力バイト数、および進行状況のパーセンテージ (スナップショットの場合)。
- イベントが生成された日時。
イベントはイベント ログ テーブルで使用できますが、パブリックAPIs経由では使用できません。 SQLを使用してイベント ログ テーブルにクエリを実行し、パイプラインの動作を分析し、モニタリング ソリューションを構築できます。
アクセス進行イベント
進行状況イベントはイベント ログ テーブルに保存されます。アクセスするには:
- Databricks ワークスペースでゲートウェイに移動します。
- UI でイベントを表示するには、 「イベント ログ」 タブをクリックします。
- 詳細な分析を行うには、SQL を使用してイベント ログ テーブルを直接クエリします。
イベントログテーブルをクエリする
flow_progressイベントの行数とバイト数を照会するには:
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
スナップショットの進行状況パーセンテージを取得するために、 operation_progressイベントを照会するには:
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
<pipeline-id>ゲートウェイ ID に置き換えます。
イベントの構造を理解する
進行状況イベントでは、ログレベルMETRICSで以下のいずれかのイベントタイプを使用します。
flow_progressスナップショットフローとCDCフローの両方で発行されます。テーブルごとの行数とバイト数の差分を報告します。operation_progressスナップショットフローでのみ出力されます。テーブルのスナップショット完了率を報告します。
以下の例は、各イベントタイプのJSON構造を示しています。
スナップショットフローの進行状況イベント構造
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}
CDCフロー進行イベント構造
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
}
}
},
"maturity_level": "STABLE"
}
スナップショット操作の進行状況イベント構造
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}
イベントフィールド
次の表は、進行状況イベントの主要なフィールドについて説明しています。
フィールド | Type | 説明 |
|---|---|---|
| String |
|
| String | 常に |
| String | イベントが生成された際の ISO 8601 タイムスタンプ。 |
| String | 常に |
| String | ゲートウェイの名前。 |
| String | 取り込まれるテーブルの名前。 |
| String | Unity Catalogカタログ名。 |
| String | Unity Catalogスキーマ名。 |
| String | 取り込みフェーズを示すフロー識別子。形式: 初期ロードの場合は |
| String | ソース データベースの種類 (例: |
| String | 現在のフローのステータス。通常は |
| Integer | 前回のイベント以降に挿入または更新された行数。Deltaメトリクス。 放出ごとにゼロにリセットされます。 |
| Integer | 前回のイベント以降に削除された行数。Deltaメトリクス。 放出ごとにゼロにリセットされます。スナップショットフローの場合は |
| Integer | 前回のイベント以降にボリュームにアップロードされた圧縮バイト数。Deltaメトリクス。 放出ごとにゼロにリセットされます。スナップショットフローとCDCフローの両方に対応しています。 |
| String | 操作の種類。スナップショット操作の場合は |
| String | 現在の稼働状況。スナップショットの実行中は |
| Integer | 操作にかかった合計時間(ミリ秒)。 |
| Double | スナップショット完了率( |
| String | スナップショットを作成するテーブルの完全修飾名。 |
| String | 常に |
メトリクスの動作
Progress Metrix は次のカテゴリに分類されます。
Deltaメトリクス ( num_upserted_rows 、 num_deleted_rows 、 num_output_bytes ):
- 前回のイベント以降の変動を表し、累積合計を表すものではありません。
- 各イベント発生後にゼロにリセットしてください。
- データ変更が発生していない場合でも発信され、活性指標として機能します。
- スナップショットフローの場合、スナップショットでは削除が行われないため、
num_deleted_rowsはnullになります。
累積メトリクス ( progress_percent ):
- スナップショットの有効期間中に、値は
0.0から100.0まで累積されます。 - スナップショットのチャンクが完成するたびに更新されます。複数のチャンクに分割されていない小さなテーブルは、発行間で
0.0から100.0に直接ジャンプします。多数のチャンクに分割された大きなテーブルは、各チャンクが到着するたびに徐々に更新され、おおよその進捗状況を示すシグナルとなる。 - メトリクスは、正確な行レベルの数ではなく、大きな/チャンク化されたスナップショットの近似値として意図されています。
進行イベントを構成する
新しいゲートウェイでは、進行状況イベントがデフォルトで有効になっています。パイプライン設定を使用してイベントの動作をカスタマイズできます。
進行状況イベントを有効または無効にする
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}
進行状況イベントを無効にするには、 "false"に設定します。
イベントの発信頻度を調整する
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}
デフォルト:300秒(5分)。有効範囲:30秒~3600秒(30秒~1時間)。この設定は、 flow_progressとoperation_progress両方のイベントの発生頻度を制御します。
ゲートウェイ構成の例
次の例は、進行状況イベントが有効になっていて、5 分ごとに発行するように設定された完全なゲートウェイ構成を示しています。
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}
重要な動作と制限
デフォルトの動作
- この機能は、すべての新しいゲートウェイに対してデフォルトで有効になっています。
- 既存のパイプラインは、次回の更新または再起動時にこの機能を自動的に受け取ります。
- 進行状況イベントを有効にするのに必要なアクションはありません。
タイミングの考慮
- 最初のエミッションには、パイプラインの開始後、進行イベントが表示されるまで、構成された頻度間隔 (デフォルト: 5 分) までかかる場合があります。
- アクティブな取り込み中は、設定された頻度でイベントが発行されます。
ゼロ更新メトリクス
-
更新がゼロのテーブルも含め、すべてのテーブルに対してイベントが発行されます。
-
ゼロ更新メトリクスは、次のことを区別するのに役立ちます。
- アイドル テーブル: 処理されましたが、データの変更は発生しませんでした。
- 未処理のテーブル: パイプラインによってまだ取得されていません。
-
ゼロ更新イベントは、パイプラインがアクティブに実行されていることを確認する活性信号として機能します。
スナップショットの進捗率の動作
- スナップショットの進行状況は
(completed_chunks / total_chunks) × 100として計算されます。メトリクスはおおよその値であり、正確な行レベルのパーセンテージではありません。 - 複数のチャンクに分割されていないテーブル(通常はより小さなテーブル)は、追跡するチャンクが1つしかないため、発行間で
0.0から100.0に直接ジャンプします。 - 多数のチャンクに分割された大きなテーブルは、各チャンクが完了するにつれて段階的に更新され、徐々に進行状況を示すシグナルを提供するため、 長時間かかる初期ロードに役立ちます。
COMPLETEDステータスは常にprogress_percent = 100.0を報告します。- メトリクスは、パイプラインの更新または再起動後には存続しません。 再起動後、スナップショットの進行状況は最後にコミットされたチェックポイントから再開され、メトリクスは再開された位置から上昇を続けます。
サンプルクエリ
次のサンプル クエリは、行、バイト、スナップショットの進行状況メトリクスを使用してゲートウェイを監視する方法を示しています。 各クエリの<pipeline-id>ゲートウェイIDに置き換えてください。
行数クエリ
テーブルごとの売上高(過去24時間)
過去24時間における各テーブルのアップサートと削除の合計数。取り込みフェーズはフロー名の接尾辞から分類されます。これをダッシュボードの見出しとして使用すれば、どのテーブルが最も多くのデータを移動したかを確認できます。
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC
最近の進捗状況(過去1時間)
パイプライン内のすべてのテーブルの最新行数カウンター。短期モニタリングに役立ちます。
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
サイレントテーブルまたはスタックテーブルを特定する
イベントを発信しているにもかかわらず、過去60分間にアップサートと削除がゼロであると報告しているテーブルは、「スタック」状態になる可能性が高い。番号を変更する必要があるかどうか、さらに調査してください。 CDCの表のうち、実際に利用されていないもの(例えば、夜間など)もここに表示されます。
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC
テーブルごとのタイムラインと累計値
過去24時間における、単一のフローに関するイベントごとの完全な履歴と、累積行数。<flow-pattern> SQL LIKEパターン (例: '%customers%_cdc_flow' ) に置き換えます。
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp
出力バイトクエリ
テーブルあたりのバイト数(過去24時間)
過去24時間にテーブルごとにボリュームにアップロードされた合計バイト数(MBおよびGB単位)。最もデータ量の多いテーブルを表示するには、降順で並べ替えてください。
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC
スループットの傾向(MB/分)
過去24時間における全フローでのアップロードバイト数の、1分ごとの時系列データ。スループットのパターンや停滞箇所を特定するために、折れ線グラフとして表示します。
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute
行あたりの平均バイト数(ワイドテーブルまたはLOB検出器)
num_output_bytesと行数を結合して、テーブルごとの行ごとの平均バイト数を計算します。 高い値は通常、スループットコストを押し上げるLOBテーブルまたはワイドスキーマテーブルを示しています。キャパシティプランニングやスキーマレビューに役立ちます。
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC
スナップショットの進捗状況に関するクエリ
全体的なスナップショットの進捗状況
現在のスナップショット実行において、完了済み、処理中、またはキューに登録されているテーブルの数を1行で要約します。これをトップレベルのダッシュボード ウィジェットとしてピン留めすると、簡単なヘルス チェックが可能になります。
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1
テーブルごとのスナップショットステータスボード
現在のスナップショットに含まれるすべてのテーブルは、最新のステータスと進捗率で実行されます。どのテーブルが最も作業を必要としているかを確認するには、 progress_pctで並べ替えてください。
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC
現在の更新で読み込まれたスナップショットの行数とバイト数
現在のスナップショット実行における進捗率、アップサートされた行数、およびアップロードされたバイト数を合計します。これを使用して、ダッシュボードに「X%完了、N行、NMB」というタイルを表示します。
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC
停止したスナップショットの検出
過去30分間変更されていないスナップショットテーブルprogress_percent 。これをアラートクエリとしてスケジュール設定することで、処理が停止しているもののイベントを発信し続けているスナップショットを検出できます。
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC
トラブルシューティング
進行イベントが表示されない
イベント ログに進行状況イベントが表示されない場合は、次の手順を実行します。
pipelines.gateway.progressEventsEnabledが"true"に設定されていることを確認してください。- パイプラインの開始後、少なくとも 1 つの完全な間隔を待機します。デフォルトは 5 分です。
- パイプラインがアクティブに実行され、取り込まれていることを確認します。
- 進行状況イベントのみを表示するには、
level = 'METRICS'フィルターを含めます。
イベントが頻繁に発生したり、まれにしか発生しない
イベントが予想される頻度で発生しない場合は、次の手順に従ってください。
pipelines.gateway.progressEventEmitFrequencySeconds設定を確認し、必要に応じて調整します。
- デフォルトは5分(300秒)です。
- 有効な範囲: 30 ~ 3600 秒。必要に応じて調整してください。
パイプラインの再起動後にメトリクスがゼロになる
パイプラインの再起動後にメトリクスがゼロにリセットされた場合:
メトリクスはメモリ内のみであり、再起動、更新、または再開時にリセットされます。 これは実装を簡素化するための意図的なものです。パイプラインはすぐに新しいメトリクスの蓄積を開始します。
一部のテーブルにメトリクスがありません
一部のテーブルに進行状況イベントが表示されない場合:
- パイプライン構成でテーブルがフィルター処理されていないことを確認してください。
- CDC フェーズでは、ソース テーブルで CDC または変更追跡が有効になっていることを確認します。
- テーブルがゲートウェイ構成に含まれていることを確認します。
- スナップショットフローの場合、
progress_percentはoperation_progressイベントでのみ出力されることに注意してください。CDC フローはoperation_progressイベントを発信しません。なぜなら、CDC には完了の概念がないからです。