Skip to main content

Monitor ingestion gateway progress with event logs

Applies to: check marked yes Database connectors

Learn how to use event logs to monitor the progress of ingestion gateways in real time. Event logs provide per-table metrics for both snapshot and change data capture (CDC) phases, enabling you to track pipeline health, identify stalled pipelines, and build automated monitoring solutions.

Progress events allow you to:

  • Track how much data has been ingested per table without waiting for pipeline completion.
  • Monitor each ingested table individually to identify bottlenecks or issues.
  • Receive events even when no data changes occur to confirm that the pipeline is actively running.
  • Build alerts and dashboards using structured event data instead of parsing logs.

How progress events work

The gateway emits flow_progress events at regular intervals (default: 5 minutes) for each table in your pipeline. Each event includes:

  • Source and destination table names.
  • The number of rows upserted and deleted since the last event.
  • When the event was generated.

Events are available in the event log table but not through public APIs. You can query the event log table using SQL to analyze pipeline behavior and build monitoring solutions.

Access progress events

Progress events are stored in the event log table. To access them:

  1. Navigate to your gateway in the Databricks workspace.
  2. Click the Event log tab to view events in the UI.
  3. Query the event log table directly using SQL for detailed analysis.

Query the event log table

To query progress events using SQL:

SQL
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) as table_name,
details:flow_progress.metrics.num_upserted_rows as rows_upserted,
details:flow_progress.metrics.num_deleted_rows as rows_deleted,
CASE
WHEN LOWER(origin.flow_name) LIKE '%cdc%' THEN 'cdc'
WHEN LOWER(origin.flow_name) LIKE '%snapshot%' THEN 'snapshot'
ELSE 'unknown'
END as ingestion_phase
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

Replace <pipeline-id> with your gateway ID.

Understand the event structure

Progress events use the flow_progress event type with METRICS log level. The following examples show the JSON structure for snapshot and CDC progress events:

Snapshot progress event structure

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": 0
}
}
},
"maturity_level": "STABLE"
}

CDC progress event structure

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3
}
}
},
"maturity_level": "STABLE"
}

Event fields

The following table describes the key fields in progress events:

Field

Type

Description

event_type

String

Always flow_progress.

level

String

Always METRICS.

timestamp

String

ISO 8601 timestamp when the event was generated.

origin.pipeline_type

String

Always INGESTION_GATEWAY.

origin.pipeline_name

String

Name of the gateway.

origin.dataset_name

String

Name of the table being ingested.

origin.catalog_name

String

Unity Catalog catalog name.

origin.schema_name

String

Unity Catalog schema name.

origin.flow_name

String

Flow identifier that indicates the ingestion phase. Format: {catalog}.{schema}.{table}_snapshot_flow for initial load or {catalog}.{schema}.{table}_cdc_flow for incremental changes.

origin.ingestion_source_type

String

Source database type (for example, SQLSERVER, MYSQL, POSTGRESQL, ORACLE).

details:flow_progress.status

String

Current flow status, typically RUNNING.

details:flow_progress.metrics.num_upserted_rows

Integer

Number of rows inserted or updated since the last event.

details:flow_progress.metrics.num_deleted_rows

Integer

Number of rows deleted since the last event.

maturity_level

String

Always STABLE.

Metric behavior

  • Row counts represent changes since the last event, not cumulative totals.
  • Counts reset to zero after each event emission.
  • Events are emitted even when no data changes occur, serving as liveness indicators.

Configure progress events

Progress events are enabled by default for new gateways. You can customize event behavior using pipeline configuration parameters.

Enable or disable progress events

JSON
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}

Set to "false" to disable progress events.

Adjust event emission frequency

JSON
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

Default: 300 seconds (five minutes). Valid range: 30 to 3600 seconds (30 seconds to 1 hour).

Example gateway configuration

The following example shows a complete gateway configuration with progress events enabled and set to emit every five minutes:

Python
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}

Important behavior and limitations

Default behavior

  • The feature is enabled by default for all new gateways.
  • Existing pipelines automatically receive this feature on their next update or restart.
  • No action is required to enable progress events.

Timing considerations

  • The first emission might take up to the configured frequency interval (default: five minutes) after pipeline start before progress events appear.
  • Events are emitted at the configured frequency during active ingestion.

Zero-update metrics

  • Events are emitted for all tables, including those with zero updates.
  • Zero-update metrics help distinguish between:
    • Idle tables: Processed but no data changes occurred.
    • Unprocessed tables: Not yet picked up by the pipeline.
  • Zero-update events serve as liveness signals confirming the pipeline is actively running.

Samples queries

View recent progress events

View recent progress events for all tables in your pipeline:

SQL
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress.metrics.num_upserted_rows,
details:flow_progress.metrics.num_deleted_rows,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

Replace <pipeline-id> with your gateway ID.

Aggregate metrics by table

Calculate total upserts and deletes for each table over a time period:

SQL
SELECT
origin.dataset_name,
COUNT(*) as event_count,
SUM(details:flow_progress.metrics.num_upserted_rows) as total_upserts,
SUM(details:flow_progress.metrics.num_deleted_rows) as total_deletes,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.dataset_name
ORDER BY total_upserts DESC

Identify idle tables

Find tables with zero updates to distinguish idle tables from stalled tables:

SQL
SELECT
origin.dataset_name,
origin.flow_name,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND details:flow_progress.metrics.num_upserted_rows = 0
AND details:flow_progress.metrics.num_deleted_rows = 0
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

Monitor emission frequency

Verify that events are being emitted at the expected frequency:

SQL
SELECT
origin.dataset_name,
timestamp,
LEAD(timestamp) OVER (PARTITION BY origin.dataset_name ORDER BY timestamp) - timestamp as interval_seconds
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY origin.dataset_name, timestamp

Troubleshooting

No progress events appear

If you don't see progress events in the event log:

  1. Check that pipelines.gateway.progressEventsEnabled is set to "true".
  2. Wait for at least one full interval after pipeline start. Default is five minutes.
  3. Check that the pipeline is actively running and ingesting.
  4. Include level = 'METRICS' filter to see only progress events.

Events appear too frequently or infrequently

If events don't appear at the expected frequency:

Check the pipelines.gateway.progressEventEmitFrequencySeconds setting and adjust as needed:

  • Default is five minutes (300 seconds).
  • Valid range: 30 to 3600 seconds. Adjust as needed.

Metrics show zero after pipeline restart

If metrics reset to zero after a pipeline restart:

Metrics are in-memory only and reset on restart, refresh, or resume. This is intentional for implementation simplicity. The pipeline will start accumulating fresh metrics immediately.

Missing metrics for some tables

If some tables don't show progress events:

  1. Make sure that the table is not filtered out in the pipeline configuration.
  2. For CDC phase, make sure that the source table has CDC or change tracking enabled.
  3. Confirm that the table is included in the gateway configuration.

Additional resources