メインコンテンツまでスキップ

イベントログで取り込みゲートウェイの進行状況を監視する

適用対象 :チェックマーク「はい」データベースコネクタ

イベント ログを使用して、取り込みゲートウェイの進行状況をリアルタイムで監視する方法を学習します。イベント ログは、スナップショットとチェンジデータ キャプチャ ( CDC ) フェーズの両方のテーブルごとのメトリクスを提供し、パイプラインの健全性を追跡し、停止したパイプラインを特定し、自動モニタリング ソリューションを構築できるようにします。

進捗イベントでは次のことが可能になります。

  • パイプラインの完了を待たずに、テーブルごとに取り込まれたデータの量を追跡します。
  • 取り込まれた各テーブルを個別に監視して、ボトルネックや問題を特定します。
  • データの変更が発生していない場合でもイベントを受信し、パイプラインがアクティブに実行されていることを確認します。
  • ログを解析する代わりに、構造化されたイベント データを使用してアラートとダッシュボードを構築します。

進捗イベントの仕組み

ゲートウェイは、パイプライン内の各テーブルに対して、一定の間隔 (デフォルト: 5 分) でflow_progressイベントを発行します。各イベントには以下が含まれます:

  • ソーステーブルと宛先テーブルの名前。
  • 最後のイベント以降にアップサートおよび削除された行の数。
  • イベントが生成された日時。

イベントはイベント ログ テーブルで使用できますが、パブリックAPIs経由では使用できません。 SQLを使用してイベント ログ テーブルにクエリを実行し、パイプラインの動作を分析し、モニタリング ソリューションを構築できます。

アクセス進行イベント

進行状況イベントはイベント ログ テーブルに保存されます。アクセスするには:

  1. Databricks ワークスペースでゲートウェイに移動します。
  2. UI でイベントを表示するには、 「イベント ログ」 タブをクリックします。
  3. 詳細な分析を行うには、SQL を使用してイベント ログ テーブルを直接クエリします。

イベントログテーブルをクエリする

SQL を使用して進行状況イベントをクエリするには:

SQL
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) as table_name,
details:flow_progress.metrics.num_upserted_rows as rows_upserted,
details:flow_progress.metrics.num_deleted_rows as rows_deleted,
CASE
WHEN LOWER(origin.flow_name) LIKE '%cdc%' THEN 'cdc'
WHEN LOWER(origin.flow_name) LIKE '%snapshot%' THEN 'snapshot'
ELSE 'unknown'
END as ingestion_phase
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

<pipeline-id>ゲートウェイ ID に置き換えます。

イベントの構造を理解する

進行状況イベントは、 METRICSログ レベルのflow_progressイベント タイプを使用します。次の例は、スナップショットおよび CDC 進行状況イベントの JSON 構造を示しています。

スナップショット進行イベント構造

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": 0
}
}
},
"maturity_level": "STABLE"
}

CDC進捗イベント構造

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3
}
}
},
"maturity_level": "STABLE"
}

イベントフィールド

次の表は、進行状況イベントの主要なフィールドについて説明しています。

フィールド

Type

説明

event_type

String

常にflow_progress

level

String

常にMETRICS

timestamp

String

イベントが生成された際の ISO 8601 タイムスタンプ。

origin.pipeline_type

String

常にINGESTION_GATEWAY

origin.pipeline_name

String

ゲートウェイの名前。

origin.dataset_name

String

取り込まれるテーブルの名前。

origin.catalog_name

String

Unity Catalogカタログ名。

origin.schema_name

String

Unity Catalogスキーマ名。

origin.flow_name

String

取り込みフェーズを示すフロー識別子。形式: 初期ロードの場合は{catalog}.{schema}.{table}_snapshot_flow 、増分変更の場合は{catalog}.{schema}.{table}_cdc_flow

origin.ingestion_source_type

String

ソース データベースの種類 (例: SQLSERVERMYSQLPOSTGRESQLORACLE )。

details:flow_progress.status

String

現在のフローのステータス。通常はRUNNING

details:flow_progress.metrics.num_upserted_rows

Integer

最後のイベント以降に挿入または更新された行数。

details:flow_progress.metrics.num_deleted_rows

Integer

最後のイベント以降に削除された行数。

maturity_level

String

常にSTABLE

メトリクスの動作

  • 行数は、累積合計ではなく、最後のイベント以降の変更を表します。
  • 各イベントの発行後にカウントはゼロにリセットされます。
  • データの変更が発生しない場合でもイベントが発行され、生存インジケーターとして機能します。

進行イベントを構成する

新しいゲートウェイでは、進行状況イベントがデフォルトで有効になっています。パイプライン設定を使用してイベントの動作をカスタマイズできます。

進行状況イベントを有効または無効にする

JSON
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}

進行状況イベントを無効にするには、 "false"に設定します。

イベントの発信頻度を調整する

JSON
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

デフォルト: 300 秒 (5 分)。有効な範囲: 30 ~ 3600 秒 (30 秒~ 1 時間)。

ゲートウェイ構成の例

次の例は、進行状況イベントが有効になっていて、5 分ごとに発行するように設定された完全なゲートウェイ構成を示しています。

Python
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}

重要な動作と制限

デフォルトの動作

  • この機能は、すべての新しいゲートウェイに対してデフォルトで有効になっています。
  • 既存のパイプラインは、次回の更新または再起動時にこの機能を自動的に受け取ります。
  • 進行状況イベントを有効にするのに必要なアクションはありません。

タイミングの考慮

  • 最初のエミッションには、パイプラインの開始後、進行イベントが表示されるまで、構成された頻度間隔 (デフォルト: 5 分) までかかる場合があります。
  • アクティブな取り込み中は、設定された頻度でイベントが発行されます。

ゼロ更新メトリクス

  • 更新がゼロのテーブルも含め、すべてのテーブルに対してイベントが発行されます。

  • ゼロ更新メトリクスは、次のことを区別するのに役立ちます。

    • アイドル テーブル: 処理されましたが、データの変更は発生しませんでした。
    • 未処理のテーブル: パイプラインによってまだ取得されていません。
  • ゼロ更新イベントは、パイプラインがアクティブに実行されていることを確認する活性信号として機能します。

サンプルクエリ

最近の進捗イベントを表示

パイプライン内のすべてのテーブルの最近の進行状況イベントを表示します。

SQL
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress.metrics.num_upserted_rows,
details:flow_progress.metrics.num_deleted_rows,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

<pipeline-id>ゲートウェイ ID に置き換えます。

テーブルごとにメトリクスを集計する

一定期間における各テーブルの合計アップサートおよび削除を計算します。

SQL
SELECT
origin.dataset_name,
COUNT(*) as event_count,
SUM(details:flow_progress.metrics.num_upserted_rows) as total_upserts,
SUM(details:flow_progress.metrics.num_deleted_rows) as total_deletes,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.dataset_name
ORDER BY total_upserts DESC

アイドル状態のテーブルを特定する

アイドル状態のテーブルと停止したテーブルを区別するために、更新がゼロのテーブルを検索します。

SQL
SELECT
origin.dataset_name,
origin.flow_name,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND details:flow_progress.metrics.num_upserted_rows = 0
AND details:flow_progress.metrics.num_deleted_rows = 0
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

放出頻度を監視する

イベントが予想される頻度で発行されていることを確認します。

SQL
SELECT
origin.dataset_name,
timestamp,
LEAD(timestamp) OVER (PARTITION BY origin.dataset_name ORDER BY timestamp) - timestamp as interval_seconds
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY origin.dataset_name, timestamp

トラブルシューティング

進行イベントが表示されない

イベント ログに進行状況イベントが表示されない場合は、次の手順を実行します。

  1. pipelines.gateway.progressEventsEnabled"true"に設定されていることを確認してください。
  2. パイプラインの開始後、少なくとも 1 つの完全な間隔を待機します。デフォルトは 5 分です。
  3. パイプラインがアクティブに実行され、取り込まれていることを確認します。
  4. 進行状況イベントのみを表示するには、 level = 'METRICS'フィルターを含めます。

イベントが頻繁に発生したり、まれにしか発生しない

イベントが予想される頻度で発生しない場合は、次の手順に従ってください。

pipelines.gateway.progressEventEmitFrequencySeconds設定を確認し、必要に応じて調整します。

  • デフォルトは5分(300秒)です。
  • 有効な範囲: 30 ~ 3600 秒。必要に応じて調整してください。

パイプラインの再起動後にメトリクスがゼロになる

パイプラインの再起動後にメトリクスがゼロにリセットされた場合:

メトリクスはメモリ内のみであり、再起動、更新、または再開時にリセットされます。 これは実装を簡素化するための意図的なものです。パイプラインはすぐに新しいメトリクスの蓄積を開始します。

一部のテーブルにメトリクスがありません

一部のテーブルに進行状況イベントが表示されない場合:

  1. パイプライン構成でテーブルがフィルター処理されていないことを確認してください。
  2. CDC フェーズでは、ソース テーブルで CDC または変更追跡が有効になっていることを確認します。
  3. テーブルがゲートウェイ構成に含まれていることを確認します。

その他のリソース