Monitor ingestion gateway progress with event logs
Applies to: SaaS connectors
Database connectors
Learn how to use event logs to monitor the progress of ingestion gateways in real time. Event logs provide per-table metrics for both snapshot and change data capture (CDC) phases, enabling you to track pipeline health, identify stalled pipelines, and build automated monitoring solutions.
Progress events allow you to:
- Track how many rows and bytes have been ingested per table without waiting for pipeline completion.
- Monitor snapshot progress for each table to estimate completion of large initial loads.
- Estimate when a long-running snapshot will complete using per-table ETA.
- Measure end-to-end CDC discovery latency (source commit to event-log emission) per table.
- Monitor each ingested table individually to identify bottlenecks or issues.
- Receive events even when no data changes occur to confirm that the pipeline is actively running.
- Build alerts and dashboards using structured event data instead of parsing logs.
How progress events work
The gateway emits the following event types at regular intervals (default: 5 minutes) for each table in your pipeline:
flow_progressevents report row and byte counters for snapshot and CDC flows. The metrics in these events are deltas. They reset to zero after each emission. For CDC flows, these events also include latency metrics that measure end-to-end discovery latency and upload-pipeline performance.operation_progressevents report snapshot progress as a percentage. Snapshot flows emit these events in addition toflow_progress. The progress percentage is cumulative. It accumulates from0to100over the lifetime of the snapshot. These events also include the estimated time remaining (estimated_completion_ms) until the snapshot finishes.
Each event includes:
- Source and destination table names.
- Per-table metrics: rows upserted, rows deleted (CDC only), output bytes, and progress percentage (for snapshot).
- For CDC flows, latency metrics including discovery latency and batch processing time.
- For snapshots, the estimated time remaining until completion.
- When the event was generated.
Events are available in the event log table but not through public APIs. You can query the event log table using SQL to analyze pipeline behavior and build monitoring solutions.
Access progress events
Progress events are stored in the event log table. To access them:
- Navigate to your gateway in the Databricks workspace.
- Click the Event log tab to view events in the UI.
- Query the event log table directly using SQL for detailed analysis.
Query the event log table
To query flow_progress events for row and byte counters:
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
To query operation_progress events for snapshot progress percentage:
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
Replace <pipeline-id> with your gateway ID.
Understand the event structure
Progress events use one of the following event types with the METRICS log level:
flow_progress: Emitted for both snapshot and CDC flows. Reports per-table row and byte deltas.operation_progress: Emitted only for snapshot flows. Reports the snapshot completion percentage for a table.
The following examples show the JSON structure for each event type:
Snapshot flow progress event structure
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}
CDC flow progress event structure
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
},
"streaming_metrics": {
"discovery_latency_ms": 12450,
"batch_processing_time_ms": 8100,
"event_time": {
"max": "2025-10-14T13:33:45.000Z"
}
}
}
},
"maturity_level": "STABLE"
}
Snapshot operation progress event structure
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"estimated_completion_ms": 1885000,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}
Event fields
The following table describes the key fields in progress events:
Field | Type | Description |
|---|---|---|
| String | Either |
| String | Always |
| String | ISO 8601 timestamp when the event was generated. |
| String | Always |
| String | Name of the gateway. |
| String | Name of the table being ingested. |
| String | Unity Catalog catalog name. |
| String | Unity Catalog schema name. |
| String | Flow identifier that indicates the ingestion phase. Format: |
| String | Source database type (for example, |
| String | Current flow status, typically |
| Integer | Number of rows inserted or updated since the last event. Delta metric. Resets to zero after each emission. |
| Integer | Number of rows deleted since the last event. Delta metric. Resets to zero after each emission. |
| Integer | Number of compressed bytes uploaded to a volume since the last event. Delta metric. Resets to zero after each emission. Populated for both snapshot and CDC flows. |
| Integer | Time in milliseconds between the source change at |
| Integer | Time in milliseconds the gateway spent reading and uploading the most recent batch of changes. Does not include source-side lag. CDC flows only. Can be |
| String | ISO 8601 timestamp of the most recent source change the gateway has read for this table. CDC flows only. |
| String | Operation type. |
| String | Current operation status. |
| Integer | Total elapsed time of the operation in milliseconds. |
| Double | Snapshot completion percentage ( |
| Integer | Estimated time remaining in milliseconds until the snapshot finishes. Decreases as the snapshot progresses and reaches |
| String | Fully qualified name of the table being snapshotted. |
| String | Always |
Metric behavior
Progress metrics fall into the following categories:
Delta metrics (num_upserted_rows, num_deleted_rows, num_output_bytes):
- Represent changes since the last event, not cumulative totals.
- Reset to zero after each event emission.
- Are emitted even when no data changes occur, serving as liveness indicators.
- For snapshot flows,
num_deleted_rowsisnullbecause snapshot does not produce deletes.
Cumulative metrics (progress_percent):
- The value accumulates from
0.0to100.0over the lifetime of a snapshot. - Updates as the snapshot progresses. For small tables, the value might jump directly from
0.0to100.0. For large tables, the value updates gradually and is an approximation, not an exact row count.
Point-in-time metrics (discovery_latency_ms, batch_processing_time_ms, event_time.max, estimated_completion_ms):
- Each value reflects the state of the metric at the moment the event was emitted.
discovery_latency_msandbatch_processing_time_msapply to CDC flows only. Values are reported as0if the result would otherwise be negative.event_time.maxapplies to CDC flows only. The value is the timestamp of the most recent source change the gateway has read.estimated_completion_msapplies to snapshot flows only. The value decreases as the snapshot progresses and reaches0on completion.
Configure progress events
Progress events are enabled by default for new gateways. You can customize event behavior using pipeline configuration parameters.
Enable or disable progress events
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}
Set to "false" to disable progress events.
Adjust event emission frequency
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}
Default: 300 seconds (five minutes). Valid range: 30 to 3600 seconds (30 seconds to one hour). This setting controls the cadence of both flow_progress and operation_progress events.
Example gateway configuration
The following example shows a complete gateway configuration with progress events enabled and set to emit every five minutes:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}
Important behavior and limitations
Default behavior
- The feature is enabled by default for all new gateways.
- Existing pipelines automatically receive this feature on their next update or restart.
- No action is required to enable progress events.
Metric availability across gateway versions
Snapshot ETA (estimated_completion_ms) and CDC latency metrics (streaming_metrics) require a gateway image from the May 2026 ingestion gateway release or later. Databricks selects the gateway image automatically. You cannot manually configure this. The features reached all production regions in May 2026.
Both newly created and existing pipelines receive the new image. Existing pipelines adopt it automatically on their next update or restart, so you don't need to migrate.
To verify whether your pipeline emits the estimated_completion_ms and streaming_metrics fields, run the following query. If both columns return rows, your gateway supports snapshot ETA and CDC latency metrics:
SELECT
MAX(CASE WHEN details:operation_progress:estimated_completion_ms IS NOT NULL
THEN timestamp END) AS last_snapshot_eta,
MAX(CASE WHEN details:flow_progress:streaming_metrics IS NOT NULL
THEN timestamp END) AS last_cdc_latency
FROM event_log('<pipeline-id>')
WHERE event_type IN ('operation_progress', 'flow_progress')
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
If neither column has a recent timestamp after a full emission interval (default: five minutes) following a pipeline restart, contact Databricks support to confirm whether the features are enabled in your region.
Timing considerations
- The first emission might take up to the configured frequency interval (default: five minutes) after pipeline start before progress events appear.
- Events are emitted at the configured frequency during active ingestion.
Zero-update metrics
- Events are emitted for all tables, including those with zero updates.
- Zero-update metrics help distinguish between:
- Idle tables: Processed but no data changes occurred.
- Unprocessed tables: Not yet picked up by the pipeline.
- Zero-update events serve as liveness signals confirming the pipeline is actively running.
Snapshot progress percentage behavior
- Snapshot progress is calculated as
(completed_chunks / total_chunks) × 100. The metric is approximate, not an exact row-level percentage. - Tables that are not split into multiple chunks (typically smaller tables) jump from
0.0directly to100.0between emissions because there is only one chunk to track. - Large tables that are split into many chunks update incrementally as each chunk completes, providing a gradual progress signal that is useful for monitoring long-running initial loads.
- A
COMPLETEDstatus always reportsprogress_percent = 100.0. - The metric does not survive a pipeline refresh or restart. After a restart, snapshot progress resumes from the last committed checkpoint and the metric continues to climb from the resumed position.
Sample queries
The following sample queries show how to monitor your gateway using row counts, output bytes, snapshot progress and ETA, and CDC latency metrics. Replace <pipeline-id> with your gateway ID in each query.
Row count queries
Volume per table (last 24 hours)
Total upserts and deletes for each table over the last 24 hours, with the ingestion phase classified from the flow name suffix. Use this as a dashboard headline to see which tables moved the most data.
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC
Recent progress events (last one hour)
Recent row counters for all tables in your pipeline. Useful for near-real-time monitoring.
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
Identify silent or stuck tables
Tables emitting events but reporting zero upserts and zero deletes for the last 60 minutes are candidates for the "stuck" status. Investigate further if the source should be changing. CDC tables that are genuinely idle (for example, overnight) also appear here.
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC
Per-table timeline with cumulative totals
Full event-by-event history for a single flow over the last 24 hours, with cumulative row counts. Replace <flow-pattern> with a SQL LIKE pattern (for example, '%customers%_cdc_flow').
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp
Output bytes queries
Bytes per table (last 24 hours)
Total bytes uploaded to a volume per table over the last 24 hours, with human-friendly MB and GB units. Sort descending to see the heaviest-volume tables.
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC
Throughput trend (MB per minute)
Per-minute time series of bytes uploaded across all flows over the last 24 hours. Render as a line chart to spot throughput patterns and stalls.
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute
Average bytes per row (wide-table or LOB detector)
Joins num_output_bytes with row counts to compute average bytes per row per table. High values usually indicate LOB or wide-schema tables that drive throughput cost. Useful for capacity planning and schema review.
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC
Snapshot progress queries
Overall snapshot progress
The following query returns a single-row summary of how many tables are completed, in progress, or queued. Results reflect the latest reported state per table across all pipeline updates in the event-log retention window, so tables completed in a prior update continue to count toward tables_completed after a refresh or restart.
WITH latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1
Per-table snapshot status board
The following query returns each table in the pipeline with its latest reported status and progress percentage. Results include tables completed in prior updates, so no tables appear as missing rows.
WITH table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC
Snapshot rows and bytes loaded in the current update
The following query combines progress percentage, rows upserted, and bytes uploaded for each table in the current snapshot run.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC
Stalled snapshot detection
The following query returns snapshot tables whose progress_percent has not changed in the last 30 minutes. Use it to identify snapshots that have stalled but are still active.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC
Snapshot ETA per table
The following query returns the current progress percentage and estimated time to completion for each table in the active snapshot run. Only tables with a status of IN_PROGRESS are returned. To include completed and queued tables, remove the status = 'IN_PROGRESS' filter.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_flow AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:estimated_completion_ms::bigint AS eta_ms,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
eta_ms,
ROUND(eta_ms / 1000.0 / 60.0, 1) AS eta_minutes,
ROUND(eta_ms / 1000.0 / 3600.0, 2) AS eta_hours,
timestamp AS last_update
FROM latest_per_flow
WHERE rn = 1
AND status = 'IN_PROGRESS'
ORDER BY eta_ms DESC NULLS LAST
CDC latency queries
CDC freshness per table
For every CDC flow, the gateway reports multiple observability fields inside streaming_metrics. They let you tell whether a table is fresh, which tables are lagging, and whether the lag is on the source side or the gateway side.
These are ingestion-gateway latencies. They measure the path from the source database through the gateway to the Unity Catalog volume. They do not include downstream applier latency from the Unity Catalog volume to the destination table. That stage is observed separately in the applier's event log.
Field | Description |
|---|---|
| ISO 8601 timestamp of the most recent change the gateway has read from the source database for this table. If this value stops changing between events, the source has stopped producing changes or the gateway can no longer read from the source. |
| Time in milliseconds between the source change at |
| Time in milliseconds the gateway spent reading and uploading the most recent batch of changes. Does not include delays on the source database side. To estimate source-database lag, subtract this value from |
The relationship between discovery_latency_ms and batch_processing_time_ms tells you where on the gateway path (from source database to Unity Catalog volume) the latency is concentrated:
Pattern | What it means |
|---|---|
Both small | CDC is running and fresh. |
| Source-side lag. The gateway is reading promptly. Commits arrived late from the source (replication delay, source log backlog, long-running source transactions). |
Both high | Gateway-side lag. The upload pipeline is the bottleneck. Check gateway compute resources and the network path to the Unity Catalog volume. |
| Per-table source issue (DDL in flight, lock contention, blocked replication slot, schema change). |
| Gateway-wide issue (resources, volume connectivity, source CDC log backlog affecting all captures). |
| Source is idle, or the gateway has lost source-database connectivity. Verify source CDC enablement and gateway logs. |
The following query returns the last 30 minutes of CDC freshness events for each CDC table. With the default five-minute interval, this produces about six rows per table. Results are sorted by discovery_latency_ms so the most lagged tables appear first.
SELECT
origin.flow_name AS flow_name,
details:flow_progress:streaming_metrics:event_time:max::string AS latest_source_commit_seen,
details:flow_progress:streaming_metrics:discovery_latency_ms::bigint AS discovery_latency_ms,
details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint AS batch_processing_time_ms,
timestamp AS emission_ts
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
ORDER BY discovery_latency_ms DESC NULLS LAST, flow_name, emission_ts DESC
CDC latency time series
The following query returns the hourly average of discovery_latency_ms and batch_processing_time_ms for each CDC flow in the last 24 hours. Use it to identify when freshness degraded. For pipelines with many CDC flows, filter by flow name to limit the result set.
SELECT
DATE_TRUNC('HOUR', timestamp) AS ts_hour,
origin.flow_name AS flow_name,
ROUND(AVG(details:flow_progress:streaming_metrics:discovery_latency_ms::bigint) / 1000.0, 2) AS avg_discovery_latency_sec,
ROUND(AVG(details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint) / 1000.0, 2) AS avg_batch_processing_sec
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('HOUR', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_hour
Troubleshooting
No progress events appear
If you don't see progress events in the event log:
- Check that
pipelines.gateway.progressEventsEnabledis set to"true". - Wait for at least one full interval after pipeline start. Default is five minutes.
- Check that the pipeline is actively running and ingesting.
- Include
level = 'METRICS'filter to see only progress events.
Events appear too frequently or infrequently
If events don't appear at the expected frequency:
Check the pipelines.gateway.progressEventEmitFrequencySeconds setting and adjust as needed:
- Default is five minutes (300 seconds).
- Valid range: 30 to 3600 seconds. Adjust as needed.
Metrics show zero after pipeline restart
If metrics reset to zero after a pipeline restart:
Metrics are in-memory only and reset on restart, refresh, or resume. This is intentional for implementation simplicity. The pipeline will start accumulating fresh metrics immediately.
Missing metrics for some tables
If some tables don't show progress events:
- Make sure that the table is not filtered out in the pipeline configuration.
- For CDC phase, make sure that the source table has CDC or change tracking enabled.
- Confirm that the table is included in the gateway configuration.
- Note that
progress_percentis only emitted inoperation_progressevents for snapshot flows. CDC flows do not emitoperation_progressevents because CDC has no concept of completion.
Missing estimated_completion_ms or streaming_metrics fields
If event_log rows exist but the estimated_completion_ms or streaming_metrics object is missing, see Metric availability across gateway versions for the diagnostic query and minimum gateway requirements.