イベントログで取り込みゲートウェイの進行状況を監視する
適用対象 :データベースコネクタ
イベント ログを使用して、取り込みゲートウェイの進行状況をリアルタイムで監視する方法を学習します。イベント ログは、スナップショットとチェンジデータ キャプチャ ( CDC ) フェーズの両方のテーブルごとのメトリクスを提供し、パイプラインの健全性を追跡し、停止したパイプラインを特定し、自動モニタリング ソリューションを構築できるようにします。
進捗イベントでは次のことが可能になります。
- パイプラインの完了を待たずに、テーブルごとに取り込まれたデータの量を追跡します。
- 取り込まれた各テーブルを個別に監視して、ボトルネックや問題を特定します。
- データの変更が発生していない場合でもイベントを受信し、パイプラインがアクティブに実行されていることを確認します。
- ログを解析する代わりに、構造化されたイベント データを使用してアラートとダッシュボードを構築します。
進捗イベントの仕組み
ゲートウェイは、パイプライン内の各テーブルに対して、一定の間隔 (デフォルト: 5 分) でflow_progressイベントを発行します。各イベントには以下が含まれます:
- ソーステーブルと宛先テーブルの名前。
- 最後のイベント以降にアップサートおよび削除された行の数。
- イベントが生成された日時。
イベントはイベント ログ テーブルで使用できますが、パブリックAPIs経由では使用できません。 SQLを使用してイベント ログ テーブルにクエリを実行し、パイプラインの動作を分析し、モニタリング ソリューションを構築できます。
アクセス進行イベント
進行状況イベントはイベント ログ テーブルに保存されます。アクセスするには:
- Databricks ワークスペースでゲートウェイに移動します。
- UI でイベントを表示するには、 「イベント ログ」 タブをクリックします。
- 詳細な分析を行うには、SQL を使用してイベント ログ テーブルを直接クエリします。
イベントログテーブルをクエリする
SQL を使用して進行状況イベントをクエリするには:
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) as table_name,
details:flow_progress.metrics.num_upserted_rows as rows_upserted,
details:flow_progress.metrics.num_deleted_rows as rows_deleted,
CASE
WHEN LOWER(origin.flow_name) LIKE '%cdc%' THEN 'cdc'
WHEN LOWER(origin.flow_name) LIKE '%snapshot%' THEN 'snapshot'
ELSE 'unknown'
END as ingestion_phase
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
<pipeline-id>ゲートウェイ ID に置き換えます。
イベントの構造を理解する
進行状況イベントは、 METRICSログ レベルのflow_progressイベント タイプを使用します。次の例は、スナップショットおよび CDC 進行状況イベントの JSON 構造を示しています。
スナップショット進行イベント構造
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": 0
}
}
},
"maturity_level": "STABLE"
}
CDC進捗イベント構造
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3
}
}
},
"maturity_level": "STABLE"
}
イベントフィールド
次の表は、進行状況イベントの主要なフィールドについて説明しています。
フィールド | Type | 説明 |
|---|---|---|
| String | 常に |
| String | 常に |
| String | イベントが生成された際の ISO 8601 タイムスタンプ。 |
| String | 常に |
| String | ゲートウェイの名前。 |
| String | 取り込まれるテーブルの名前。 |
| String | Unity Catalogカタログ名。 |
| String | Unity Catalogスキーマ名。 |
| String | 取り込みフェーズを示すフロー識別子。形式: 初期ロードの場合は |
| String | ソース データベースの種類 (例: |
| String | 現在のフローのステータス。通常は |
| Integer | 最後のイベント以降に挿入または更新された行数。 |
| Integer | 最後のイベント以降に削除された行数。 |
| String | 常に |
メトリクスの動作
- 行数は、累積合計ではなく、最後のイベント以降の変更を表します。
- 各イベントの発行後にカウントはゼロにリセットされます。
- データの変更が発生しない場合でもイベントが発行され、生存インジケーターとして機能します。
進行イベントを構成する
新しいゲートウェイでは、進行状況イベントがデフォルトで有効になっています。パイプライン設定を使用してイベントの動作をカスタマイズできます。
進行状況イベントを有効または無効にする
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}
進行状況イベントを無効にするには、 "false"に設定します。
イベントの発信頻度を調整する
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}
デフォルト: 300 秒 (5 分)。有効な範囲: 30 ~ 3600 秒 (30 秒~ 1 時間)。
ゲートウェイ構成の例
次の例は、進行状況イベントが有効になっていて、5 分ごとに発行するように設定された完全なゲートウェイ構成を示しています。
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}
重要な動作と制限
デフォルトの動作
- この機能は、すべての新しいゲートウェイに対してデフォルトで有効になっています。
- 既存のパイプラインは、次回の更新または再起動時にこの機能を自動的に受け取ります。
- 進行状況イベントを有効にするのに必要なアクションはありません。
タイミングの考慮
- 最初のエミッションには、パイプラインの開始後、進行イベントが表示されるまで、構成された頻度間隔 (デフォルト: 5 分) までかかる場合があります。
- アクティブな取り込み中は、設定された頻度でイベントが発行されます。
ゼロ更新メトリクス
-
更新がゼロのテーブルも含め、すべてのテーブルに対してイベントが発行されます。
-
ゼロ更新メトリクスは、次のことを区別するのに役立ちます。
- アイドル テーブル: 処理されましたが、データの変更は発生しませんでした。
- 未処理のテーブル: パイプラインによってまだ取得されていません。
-
ゼロ更新イベントは、パイプラインがアクティブに実行されていることを確認する活性信号として機能します。
サンプルクエリ
最近の進捗イベントを表示
パイプライン内のすべてのテーブルの最近の進行状況イベントを表示します。
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress.metrics.num_upserted_rows,
details:flow_progress.metrics.num_deleted_rows,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
<pipeline-id>ゲートウェイ ID に置き換えます。
テーブルごとにメトリクスを集計する
一定期間における各テーブルの合計アップサートおよび削除を計算します。
SELECT
origin.dataset_name,
COUNT(*) as event_count,
SUM(details:flow_progress.metrics.num_upserted_rows) as total_upserts,
SUM(details:flow_progress.metrics.num_deleted_rows) as total_deletes,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.dataset_name
ORDER BY total_upserts DESC
アイドル状態のテーブルを特定する
アイドル状態のテーブルと停止したテーブルを区別するために、更新がゼロのテーブルを検索します。
SELECT
origin.dataset_name,
origin.flow_name,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND details:flow_progress.metrics.num_upserted_rows = 0
AND details:flow_progress.metrics.num_deleted_rows = 0
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
放出頻度を監視する
イベントが予想される頻度で発行されていることを確認します。
SELECT
origin.dataset_name,
timestamp,
LEAD(timestamp) OVER (PARTITION BY origin.dataset_name ORDER BY timestamp) - timestamp as interval_seconds
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY origin.dataset_name, timestamp
トラブルシューティング
進行イベントが表示されない
イベント ログに進行状況イベントが表示されない場合は、次の手順を実行します。
pipelines.gateway.progressEventsEnabledが"true"に設定されていることを確認してください。- パイプラインの開始後、少なくとも 1 つの完全な間隔を待機します。デフォルトは 5 分です。
- パイプラインがアクティブに実行され、取り込まれていることを確認します。
- 進行状況イベントのみを表示するには、
level = 'METRICS'フィルターを含めます。
イベントが頻繁に発生したり、まれにしか発生しない
イベントが予想される頻度で発生しない場合は、次の手順に従ってください。
pipelines.gateway.progressEventEmitFrequencySeconds設定を確認し、必要に応じて調整します。
- デフォルトは5分(300秒)です。
- 有効な範囲: 30 ~ 3600 秒。必要に応じて調整してください。
パイプラインの再起動後にメトリクスがゼロになる
パイプラインの再起動後にメトリクスがゼロにリセットされた場合:
メトリクスはメモリ内のみであり、再起動、更新、または再開時にリセットされます。 これは実装を簡素化するための意図的なものです。パイプラインはすぐに新しいメトリクスの蓄積を開始します。
一部のテーブルにメトリクスがありません
一部のテーブルに進行状況イベントが表示されない場合:
- パイプライン構成でテーブルがフィルター処理されていないことを確認してください。
- CDC フェーズでは、ソース テーブルで CDC または変更追跡が有効になっていることを確認します。
- テーブルがゲートウェイ構成に含まれていることを確認します。