Skip to main content

Monitor ingestion gateway progress with event logs

Applies to: Red X icon SaaS connectors Green check icon Database connectors

Learn how to use event logs to monitor the progress of ingestion gateways in real time. Event logs provide per-table metrics for both snapshot and change data capture (CDC) phases, enabling you to track pipeline health, identify stalled pipelines, and build automated monitoring solutions.

Progress events allow you to:

  • Track how many rows and bytes have been ingested per table without waiting for pipeline completion.
  • Monitor snapshot progress for each table to estimate completion of large initial loads.
  • Estimate when a long-running snapshot will complete using per-table ETA.
  • Measure end-to-end CDC discovery latency (source commit to event-log emission) per table.
  • Monitor each ingested table individually to identify bottlenecks or issues.
  • Receive events even when no data changes occur to confirm that the pipeline is actively running.
  • Build alerts and dashboards using structured event data instead of parsing logs.

How progress events work

The gateway emits the following event types at regular intervals (default: 5 minutes) for each table in your pipeline:

  • flow_progress events report row and byte counters for snapshot and CDC flows. The metrics in these events are deltas. They reset to zero after each emission. For CDC flows, these events also include latency metrics that measure end-to-end discovery latency and upload-pipeline performance.
  • operation_progress events report snapshot progress as a percentage. Snapshot flows emit these events in addition to flow_progress. The progress percentage is cumulative. It accumulates from 0 to 100 over the lifetime of the snapshot. These events also include the estimated time remaining (estimated_completion_ms) until the snapshot finishes.

Each event includes:

  • Source and destination table names.
  • Per-table metrics: rows upserted, rows deleted (CDC only), output bytes, and progress percentage (for snapshot).
  • For CDC flows, latency metrics including discovery latency and batch processing time.
  • For snapshots, the estimated time remaining until completion.
  • When the event was generated.

Events are available in the event log table but not through public APIs. You can query the event log table using SQL to analyze pipeline behavior and build monitoring solutions.

Access progress events

Progress events are stored in the event log table. To access them:

  1. Navigate to your gateway in the Databricks workspace.
  2. Click the Event log tab to view events in the UI.
  3. Query the event log table directly using SQL for detailed analysis.

Query the event log table

To query flow_progress events for row and byte counters:

SQL
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

To query operation_progress events for snapshot progress percentage:

SQL
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

Replace <pipeline-id> with your gateway ID.

Understand the event structure

Progress events use one of the following event types with the METRICS log level:

  • flow_progress: Emitted for both snapshot and CDC flows. Reports per-table row and byte deltas.
  • operation_progress: Emitted only for snapshot flows. Reports the snapshot completion percentage for a table.

The following examples show the JSON structure for each event type:

Snapshot flow progress event structure

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}

CDC flow progress event structure

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
},
"streaming_metrics": {
"discovery_latency_ms": 12450,
"batch_processing_time_ms": 8100,
"event_time": {
"max": "2025-10-14T13:33:45.000Z"
}
}
}
},
"maturity_level": "STABLE"
}

Snapshot operation progress event structure

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"estimated_completion_ms": 1885000,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}

Event fields

The following table describes the key fields in progress events:

Field

Type

Description

event_type

String

Either flow_progress (row and byte counters for snapshot and CDC) or operation_progress (snapshot progress percentage).

level

String

Always METRICS.

timestamp

String

ISO 8601 timestamp when the event was generated.

origin.pipeline_type

String

Always INGESTION_GATEWAY.

origin.pipeline_name

String

Name of the gateway.

origin.dataset_name

String

Name of the table being ingested.

origin.catalog_name

String

Unity Catalog catalog name.

origin.schema_name

String

Unity Catalog schema name.

origin.flow_name

String

Flow identifier that indicates the ingestion phase. Format: {catalog}.{schema}.{table}_snapshot_flow for initial load or {catalog}.{schema}.{table}_cdc_flow for incremental changes.

origin.ingestion_source_type

String

Source database type (for example, SQLSERVER, MYSQL, POSTGRESQL, ORACLE).

details:flow_progress.status

String

Current flow status, typically RUNNING.

details:flow_progress.metrics.num_upserted_rows

Integer

Number of rows inserted or updated since the last event. Delta metric. Resets to zero after each emission.

details:flow_progress.metrics.num_deleted_rows

Integer

Number of rows deleted since the last event. Delta metric. Resets to zero after each emission. null for snapshot flows (snapshot does not delete).

details:flow_progress.metrics.num_output_bytes

Integer

Number of compressed bytes uploaded to a volume since the last event. Delta metric. Resets to zero after each emission. Populated for both snapshot and CDC flows.

details:flow_progress.streaming_metrics.discovery_latency_ms

Integer

Time in milliseconds between the source change at event_time.max and when this event was emitted. CDC flows only. Can be null if the source does not expose change timestamps.

details:flow_progress.streaming_metrics.batch_processing_time_ms

Integer

Time in milliseconds the gateway spent reading and uploading the most recent batch of changes. Does not include source-side lag. CDC flows only. Can be null until the first batch completes.

details:flow_progress.streaming_metrics.event_time.max

String

ISO 8601 timestamp of the most recent source change the gateway has read for this table. CDC flows only.

details:operation_progress.type

String

Operation type. CDC_SNAPSHOT for snapshot operations.

details:operation_progress.status

String

Current operation status. IN_PROGRESS while the snapshot is running, COMPLETED when it finishes. Other values include STARTED, CANCELED, and FAILED.

details:operation_progress.duration_ms

Integer

Total elapsed time of the operation in milliseconds.

details:operation_progress.progress_percent

Double

Snapshot completion percentage (0.0 - 100.0). Cumulative, not a delta. The value increases as chunks complete and reaches 100.0 when the snapshot finishes.

details:operation_progress.estimated_completion_ms

Integer

Estimated time remaining in milliseconds until the snapshot finishes. Decreases as the snapshot progresses and reaches 0 on completion. Can briefly increase if progress slows. Can be null until enough data has been processed to produce an estimate.

details:operation_progress.cdc_snapshot.target_table_name

String

Fully qualified name of the table being snapshotted.

maturity_level

String

Always STABLE.

Metric behavior

Progress metrics fall into the following categories:

Delta metrics (num_upserted_rows, num_deleted_rows, num_output_bytes):

  • Represent changes since the last event, not cumulative totals.
  • Reset to zero after each event emission.
  • Are emitted even when no data changes occur, serving as liveness indicators.
  • For snapshot flows, num_deleted_rows is null because snapshot does not produce deletes.

Cumulative metrics (progress_percent):

  • The value accumulates from 0.0 to 100.0 over the lifetime of a snapshot.
  • Updates as the snapshot progresses. For small tables, the value might jump directly from 0.0 to 100.0. For large tables, the value updates gradually and is an approximation, not an exact row count.

Point-in-time metrics (discovery_latency_ms, batch_processing_time_ms, event_time.max, estimated_completion_ms):

  • Each value reflects the state of the metric at the moment the event was emitted.
  • discovery_latency_ms and batch_processing_time_ms apply to CDC flows only. Values are reported as 0 if the result would otherwise be negative.
  • event_time.max applies to CDC flows only. The value is the timestamp of the most recent source change the gateway has read.
  • estimated_completion_ms applies to snapshot flows only. The value decreases as the snapshot progresses and reaches 0 on completion.

Configure progress events

Progress events are enabled by default for new gateways. You can customize event behavior using pipeline configuration parameters.

Enable or disable progress events

JSON
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}

Set to "false" to disable progress events.

Adjust event emission frequency

JSON
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

Default: 300 seconds (five minutes). Valid range: 30 to 3600 seconds (30 seconds to one hour). This setting controls the cadence of both flow_progress and operation_progress events.

Example gateway configuration

The following example shows a complete gateway configuration with progress events enabled and set to emit every five minutes:

Python
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}

Important behavior and limitations

Default behavior

  • The feature is enabled by default for all new gateways.
  • Existing pipelines automatically receive this feature on their next update or restart.
  • No action is required to enable progress events.

Metric availability across gateway versions

Snapshot ETA (estimated_completion_ms) and CDC latency metrics (streaming_metrics) require a gateway image from the May 2026 ingestion gateway release or later. Databricks selects the gateway image automatically. You cannot manually configure this. The features reached all production regions in May 2026.

Both newly created and existing pipelines receive the new image. Existing pipelines adopt it automatically on their next update or restart, so you don't need to migrate.

To verify whether your pipeline emits the estimated_completion_ms and streaming_metrics fields, run the following query. If both columns return rows, your gateway supports snapshot ETA and CDC latency metrics:

SQL
SELECT
MAX(CASE WHEN details:operation_progress:estimated_completion_ms IS NOT NULL
THEN timestamp END) AS last_snapshot_eta,
MAX(CASE WHEN details:flow_progress:streaming_metrics IS NOT NULL
THEN timestamp END) AS last_cdc_latency
FROM event_log('<pipeline-id>')
WHERE event_type IN ('operation_progress', 'flow_progress')
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR

If neither column has a recent timestamp after a full emission interval (default: five minutes) following a pipeline restart, contact Databricks support to confirm whether the features are enabled in your region.

Timing considerations

  • The first emission might take up to the configured frequency interval (default: five minutes) after pipeline start before progress events appear.
  • Events are emitted at the configured frequency during active ingestion.

Zero-update metrics

  • Events are emitted for all tables, including those with zero updates.
  • Zero-update metrics help distinguish between:
    • Idle tables: Processed but no data changes occurred.
    • Unprocessed tables: Not yet picked up by the pipeline.
  • Zero-update events serve as liveness signals confirming the pipeline is actively running.

Snapshot progress percentage behavior

  • Snapshot progress is calculated as (completed_chunks / total_chunks) × 100. The metric is approximate, not an exact row-level percentage.
  • Tables that are not split into multiple chunks (typically smaller tables) jump from 0.0 directly to 100.0 between emissions because there is only one chunk to track.
  • Large tables that are split into many chunks update incrementally as each chunk completes, providing a gradual progress signal that is useful for monitoring long-running initial loads.
  • A COMPLETED status always reports progress_percent = 100.0.
  • The metric does not survive a pipeline refresh or restart. After a restart, snapshot progress resumes from the last committed checkpoint and the metric continues to climb from the resumed position.

Sample queries

The following sample queries show how to monitor your gateway using row counts, output bytes, snapshot progress and ETA, and CDC latency metrics. Replace <pipeline-id> with your gateway ID in each query.

Row count queries

Volume per table (last 24 hours)

Total upserts and deletes for each table over the last 24 hours, with the ingestion phase classified from the flow name suffix. Use this as a dashboard headline to see which tables moved the most data.

SQL
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC

Recent progress events (last one hour)

Recent row counters for all tables in your pipeline. Useful for near-real-time monitoring.

SQL
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

Identify silent or stuck tables

Tables emitting events but reporting zero upserts and zero deletes for the last 60 minutes are candidates for the "stuck" status. Investigate further if the source should be changing. CDC tables that are genuinely idle (for example, overnight) also appear here.

SQL
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC

Per-table timeline with cumulative totals

Full event-by-event history for a single flow over the last 24 hours, with cumulative row counts. Replace <flow-pattern> with a SQL LIKE pattern (for example, '%customers%_cdc_flow').

SQL
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp

Output bytes queries

Bytes per table (last 24 hours)

Total bytes uploaded to a volume per table over the last 24 hours, with human-friendly MB and GB units. Sort descending to see the heaviest-volume tables.

SQL
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC

Throughput trend (MB per minute)

Per-minute time series of bytes uploaded across all flows over the last 24 hours. Render as a line chart to spot throughput patterns and stalls.

SQL
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute

Average bytes per row (wide-table or LOB detector)

Joins num_output_bytes with row counts to compute average bytes per row per table. High values usually indicate LOB or wide-schema tables that drive throughput cost. Useful for capacity planning and schema review.

SQL
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC

Snapshot progress queries

Overall snapshot progress

The following query returns a single-row summary of how many tables are completed, in progress, or queued. Results reflect the latest reported state per table across all pipeline updates in the event-log retention window, so tables completed in a prior update continue to count toward tables_completed after a refresh or restart.

SQL
WITH latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1

Per-table snapshot status board

The following query returns each table in the pipeline with its latest reported status and progress percentage. Results include tables completed in prior updates, so no tables appear as missing rows.

SQL
WITH table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC

Snapshot rows and bytes loaded in the current update

The following query combines progress percentage, rows upserted, and bytes uploaded for each table in the current snapshot run.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC

Stalled snapshot detection

The following query returns snapshot tables whose progress_percent has not changed in the last 30 minutes. Use it to identify snapshots that have stalled but are still active.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC

Snapshot ETA per table

The following query returns the current progress percentage and estimated time to completion for each table in the active snapshot run. Only tables with a status of IN_PROGRESS are returned. To include completed and queued tables, remove the status = 'IN_PROGRESS' filter.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_flow AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:estimated_completion_ms::bigint AS eta_ms,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
eta_ms,
ROUND(eta_ms / 1000.0 / 60.0, 1) AS eta_minutes,
ROUND(eta_ms / 1000.0 / 3600.0, 2) AS eta_hours,
timestamp AS last_update
FROM latest_per_flow
WHERE rn = 1
AND status = 'IN_PROGRESS'
ORDER BY eta_ms DESC NULLS LAST

CDC latency queries

CDC freshness per table

For every CDC flow, the gateway reports multiple observability fields inside streaming_metrics. They let you tell whether a table is fresh, which tables are lagging, and whether the lag is on the source side or the gateway side.

These are ingestion-gateway latencies. They measure the path from the source database through the gateway to the Unity Catalog volume. They do not include downstream applier latency from the Unity Catalog volume to the destination table. That stage is observed separately in the applier's event log.

Field

Description

event_time.max

ISO 8601 timestamp of the most recent change the gateway has read from the source database for this table. If this value stops changing between events, the source has stopped producing changes or the gateway can no longer read from the source.

discovery_latency_ms

Time in milliseconds between the source change at event_time.max and when the gateway emitted this event. Covers the full path from the source database to the Unity Catalog volume. A lower value means the data in the volume is more current.

batch_processing_time_ms

Time in milliseconds the gateway spent reading and uploading the most recent batch of changes. Does not include delays on the source database side. To estimate source-database lag, subtract this value from discovery_latency_ms.

The relationship between discovery_latency_ms and batch_processing_time_ms tells you where on the gateway path (from source database to Unity Catalog volume) the latency is concentrated:

Pattern

What it means

Both small

CDC is running and fresh.

discovery_latency_ms high, batch_processing_time_ms low

Source-side lag. The gateway is reading promptly. Commits arrived late from the source (replication delay, source log backlog, long-running source transactions).

Both high

Gateway-side lag. The upload pipeline is the bottleneck. Check gateway compute resources and the network path to the Unity Catalog volume.

discovery_latency_ms rising on one table only

Per-table source issue (DDL in flight, lock contention, blocked replication slot, schema change).

discovery_latency_ms rising on every table

Gateway-wide issue (resources, volume connectivity, source CDC log backlog affecting all captures).

event_time.max not advancing across multiple emissions

Source is idle, or the gateway has lost source-database connectivity. Verify source CDC enablement and gateway logs.

The following query returns the last 30 minutes of CDC freshness events for each CDC table. With the default five-minute interval, this produces about six rows per table. Results are sorted by discovery_latency_ms so the most lagged tables appear first.

SQL
SELECT
origin.flow_name AS flow_name,
details:flow_progress:streaming_metrics:event_time:max::string AS latest_source_commit_seen,
details:flow_progress:streaming_metrics:discovery_latency_ms::bigint AS discovery_latency_ms,
details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint AS batch_processing_time_ms,
timestamp AS emission_ts
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
ORDER BY discovery_latency_ms DESC NULLS LAST, flow_name, emission_ts DESC

CDC latency time series

The following query returns the hourly average of discovery_latency_ms and batch_processing_time_ms for each CDC flow in the last 24 hours. Use it to identify when freshness degraded. For pipelines with many CDC flows, filter by flow name to limit the result set.

SQL
SELECT
DATE_TRUNC('HOUR', timestamp) AS ts_hour,
origin.flow_name AS flow_name,
ROUND(AVG(details:flow_progress:streaming_metrics:discovery_latency_ms::bigint) / 1000.0, 2) AS avg_discovery_latency_sec,
ROUND(AVG(details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint) / 1000.0, 2) AS avg_batch_processing_sec
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('HOUR', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_hour

Troubleshooting

No progress events appear

If you don't see progress events in the event log:

  1. Check that pipelines.gateway.progressEventsEnabled is set to "true".
  2. Wait for at least one full interval after pipeline start. Default is five minutes.
  3. Check that the pipeline is actively running and ingesting.
  4. Include level = 'METRICS' filter to see only progress events.

Events appear too frequently or infrequently

If events don't appear at the expected frequency:

Check the pipelines.gateway.progressEventEmitFrequencySeconds setting and adjust as needed:

  • Default is five minutes (300 seconds).
  • Valid range: 30 to 3600 seconds. Adjust as needed.

Metrics show zero after pipeline restart

If metrics reset to zero after a pipeline restart:

Metrics are in-memory only and reset on restart, refresh, or resume. This is intentional for implementation simplicity. The pipeline will start accumulating fresh metrics immediately.

Missing metrics for some tables

If some tables don't show progress events:

  1. Make sure that the table is not filtered out in the pipeline configuration.
  2. For CDC phase, make sure that the source table has CDC or change tracking enabled.
  3. Confirm that the table is included in the gateway configuration.
  4. Note that progress_percent is only emitted in operation_progress events for snapshot flows. CDC flows do not emit operation_progress events because CDC has no concept of completion.

Missing estimated_completion_ms or streaming_metrics fields

If event_log rows exist but the estimated_completion_ms or streaming_metrics object is missing, see Metric availability across gateway versions for the diagnostic query and minimum gateway requirements.

Additional resources