Skip to main content

Monitor and observe Auto Loader

Auto Loader pipelines require active monitoring to detect issues like growing backlogs, schema drift, corrupt data, and stalled streams before they affect downstream consumers. This page describes how to monitor key metrics, query file-level state, build observability dashboards, and troubleshoot common issues.

For production configuration details, see Configure Auto Loader for production workloads.

Prerequisites

Several monitoring workflows on this page rely on cloud_files_state() to observe per-file ingestion state — including backlog queries, latency calculations, and schema drift detection. cloud_files_state() is a table-valued function that returns file-level ingestion state for an Auto Loader checkpoint. Not all of its fields are available by default. Availability depends on your Databricks Runtime version and configuration:

  • Databricks Runtime 18.2 and above: discovery_time, processed_time, and commit_time are available automatically. On Databricks Runtime 16.4–18.1, these fields are available only when cloudFiles.cleanSource is enabled.
  • Databricks Runtime 16.4 and above with cloudFiles.cleanSource enabled: archive_time, archive_mode, and move_location are available.

Enabling cloudFiles.cleanSource has some performance overhead. Benchmark against your workloads in a pre-production environment before enabling it in production.

Additionally:

  • Annotate ingested data with the _metadata column. Capture at minimum file_path and file_modification_time. See File metadata column.
  • Enable _rescued_data and _corrupt_record columns.

Key Auto Loader metrics

The following table summarizes the most important metrics to monitor for Auto Loader pipelines. These metrics are available from StreamingQueryListener progress events, with Auto Loader-specific values exposed under each source's metrics map.

Metric

What it tells you

numFilesOutstanding

Number of files in the backlog waiting to be processed

numBytesOutstanding

Size of the file backlog in bytes

approximateQueueSize

Cloud queue depth (file notification mode only)

numInputRows

Rows processed per batch

inputRowsPerSecond

Data arrival rate

processedRowsPerSecond

Processing throughput

durationMs breakdown

Where time is spent in each batch

What to watch for

The following patterns indicate your pipeline may need attention.

  • Growing numFilesOutstanding: The backlog is building up. Your pipeline is falling behind incoming data.
  • processedRowsPerSecond < inputRowsPerSecond: The pipeline is processing data slower than it arrives.
  • Large durationMs.latestOffset: File discovery is slow. Consider switching to file events.
  • Large durationMs.addBatch: Data processing is slow. Consider scaling compute or optimizing transformations.

For the full metrics reference, see Auto Loader source metrics.

Query file-level state with cloud_files_state

The cloud_files_state() table-valued function provides detailed information about each file discovered by Auto Loader. The following fields are available. Fields marked as requiring Databricks Runtime 16.4 and above or 18.2 and above are only populated under the conditions described in Prerequisites.

Field

Type

Description

path

STRING

The path of the file

size

BIGINT

The size of the file in bytes

create_time

TIMESTAMP

When the file was created

discovery_time

TIMESTAMP

When Auto Loader discovered the file (Databricks Runtime 16.4 and above)

processed_time

TIMESTAMP

When Auto Loader processed the file (Databricks Runtime 16.4 and above)

commit_time

TIMESTAMP

When the file was committed to the checkpoint (Databricks Runtime 16.4 and above)

archive_time

TIMESTAMP

When the file was archived (requires cloudFiles.cleanSource)

archive_mode

STRING

MOVE, DELETE, or NULL (requires cloudFiles.cleanSource)

move_location

STRING

Destination path when cloudFiles.cleanSource is MOVE

ingestion_state

STRING

Current file ingestion state

Investigate file ingestion state

The following queries cover common diagnostic scenarios.

Find all unprocessed files (the current backlog):

SQL
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

Compute average ingestion latency (time from file creation to commit):

SQL
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

Find corrupted or skipped files:

SQL
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

Track archival progress (requires cloudFiles.cleanSource):

SQL
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

Find files with high discovery-to-commit latency to identify bottlenecks:

SQL
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

For the full SQL reference, see cloud_files_state table-valued function.

Monitor Auto Loader in Lakeflow Spark Declarative Pipelines

Databricks recommends using Lakeflow Spark Declarative Pipelines for production Auto Loader pipelines. To take advantage of its built-in monitoring capabilities:

  • Store the Lakeflow Spark Declarative Pipelines event log in a Delta table so it can be queried for observability data. Configure this through the pipeline's advanced settings or the API. For details, see Pipeline event log.

  • Structure your pipeline for observability. A well-structured Auto Loader pipeline in Lakeflow Spark Declarative Pipelines includes a {table}_source view (the Auto Loader source definition), a {table}_bronze streaming table (raw data ingestion with _rescued_data and _corrupt_record columns), a corrupt_records_sink that quarantines rows with unparsable data, and a {table} clean view for downstream consumption.

  • Set expectations on your bronze streaming tables to monitor for schema drift and data corruption. _rescued_data IS NULL detects unexpected schema changes and _corrupt_record IS NULL detects unparsable data. Lakeflow Spark Declarative Pipelines evaluates these expectations as data arrives and generates an observability trail. You can configure expectations to warn, drop rows, or fail the pipeline.

After creating the event_log_raw view for your pipeline, use the following queries for Auto Loader-specific metrics.

Monitor ingestion throughput per flow:

SQL
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Monitor data backlog per flow:

SQL
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

Summarize expectation violations to detect schema drift and corrupt data:

SQL
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;

For general Lakeflow Spark Declarative Pipelines monitoring guidance, see Monitor pipelines and Pipeline event log.

Monitor Auto Loader with Structured Streaming

When running Auto Loader outside of Lakeflow Spark Declarative Pipelines, use the following Structured Streaming monitoring approaches.

  • Implement a StreamingQueryListener to capture Auto Loader-specific metrics from each batch by reading from source.metrics.
Python
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass

def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
pass

spark.streams.addListener(AutoLoaderMonitor())
note

Processing logic in listeners can slow down query processing. Limit computation in listener callbacks and avoid synchronous external writes there; instead emit lightweight telemetry asynchronously or hand metrics off to a separate job for persistence.

  • Use numInputRows, inputRowsPerSecond, and processedRowsPerSecond from the source progress to compute throughput — files per second and rows per second for each batch.

  • To compute ingestion latency, compare create_time and commit_time from cloud_files_state() for end-to-end latency. For processing latency, use the durationMs breakdown (for example, latestOffset, addBatch, and other reported batch phases) to identify which stage is the bottleneck.

  • Use df.observe() to define inline data quality metrics directly on the streaming DataFrame. Metrics are visible in StreamingQueryListener progress events under observedMetrics.

Python
from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
  • Use .queryName() to assign a unique name to each stream, making it easier to distinguish Auto Loader streams in the Spark UI Streaming tab and in monitoring dashboards.

For the full Structured Streaming monitoring reference, see Monitoring Structured Streaming queries on Databricks.

Build an observability dashboard

Combine data from multiple sources to build a comprehensive observability dashboard for your Auto Loader pipelines. This table displays some suggested sources you can use to structure your observability dashboard.

Data source

Observability data

cloud_files_state()

File-level ingestion state: discovery, processing, commit, and archival timestamps per file

Lakeflow Spark Declarative Pipelines event log

Pipeline run history, per-batch flow metrics, and data quality expectation results

Pipeline output tables

Row counts and data volume written per ingested table

You can then aggregate observability data into dedicated tables that serve as the foundation for dashboards and alerts:

  • Summarize pipeline run statuses (success or failure) over time, derived from event_type = 'update_progress' events.
  • Aggregate file ingestion metrics (backlog size, throughput, latency per batch), derived from cloud_files_state() and event_type = 'flow_progress' events.
  • Develop table statistics using row counts and data volume per table, derived from num_output_rows in the event log.
  • Collect debugging info from detailed error logs and expectation violations per update, derived from event_type = 'flow_progress' events with data_quality populated.

These aggregated tables can power an AI/BI dashboard and SQL alerts. Recommended dashboard panels include pipeline run status timeline, ingestion backlog trend, throughput trend, ingestion latency distribution, data quality metrics, schema evolution events, and file archival status.

Monitor schema evolution events

Use the following approaches to detect schema changes as they occur.

  • Non-NULL values in _rescued_data in the expectation violation counts indicate schema drift. Query the event log for failed_records > 0 on the no rescued data expectation.
  • Changes to the _schemas directory inside the configured cloudFiles.schemaLocation (or inside the checkpoint only when schema location is not set separately) indicate that schema evolution has occurred. You can poll this directory from a separate monitoring job.
  • Do not treat an onQueryTerminated event followed by onQueryStarted for the same stream name as sufficient evidence of schema evolution on its own. Streams restart for many reasons (cluster restarts, code deploys, transient storage errors). Correlate restarts with independent signals — _schemas directory changes or _rescued_data expectation violations — before concluding that schema evolution occurred.
  • Use _metadata.file_path to identify which files introduced schema changes. Join this with cloud_files_state() on the path field to correlate schema changes with specific files and batches.

Use this example query to detect recent schema drift via expectation violations:

SQL
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;

Set up alerts for common issues

Use Databricks SQL alerts or pipeline notifications to detect problems before they affect downstream consumers.

The following SQL detects a growing backlog and can be used as the basis for a Databricks SQL alert. Schedule it to run periodically (for example, every 5 minutes) and alert when the result is non-empty.

SQL
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB

The following table summarizes recommended alert conditions:

What to detect

How to detect it

When to alert

Growing backlog

numFilesOutstanding trending upward

Sustained increase over multiple batches

Stalled stream

No progress events

No events for N minutes (based on expected trigger interval)

High ingestion latency

commit_time - create_time

Exceeds your SLA threshold

Data quality degradation

Expectation failure rate

Increasing percentage of rows failing expectations

Schema evolution event

_rescued_data IS NOT NULL

Any non-NULL values in the expectation violation count

Slow file discovery

durationMs.latestOffset

Significantly higher than baseline

Troubleshoot common issues

The following table describes common Auto Loader pipeline issues, their likely causes, and recommended actions to resolve them.

Issue

Possible cause

Recommended action

Backlog growing faster than processing

Undersized compute, data skew, or throttled rate limits

Scale compute, check for skew with the Spark UI, and review maxFilesPerTrigger settings to control batch size

Files not being discovered

File events misconfigured, permissions issue, or stream not run within 7 days

Verify external location permissions, check file events setup in the Unity Catalog UI, and ensure the stream runs at least every 7 days to avoid RocksDB state expiry

Stream startup taking too long

Large checkpoint state download (RocksDB)

Upgrade to Databricks Runtime 15.3 and above for async state loading, which reduces startup time by ~90%

Duplicate file processing

Aggressive cloudFiles.maxFileAge settings or checkpoint corruption

Use a conservative maxFileAge (90+ days minimum), verify checkpoint integrity, and avoid lifecycle policies on checkpoint storage

Schema evolution causing pipeline restarts

Frequent or incompatible schema changes

Review schemaEvolutionMode, switch to addNewColumnsWithTypeWidening for type promotions, or use Variant type for highly dynamic schemas

Corrupt data accumulating in sink

Source data quality issues

Check the _corrupt_record quarantine sink for patterns, review source data generation, and consider adding upstream validation

discovery_time and commit_time not populated

Running on Databricks Runtime below 18.2 without cleanSource

Upgrade to Databricks Runtime 18.2 and above or enable cloudFiles.cleanSource on Databricks Runtime 16.4–18.1

For additional troubleshooting, see Auto Loader FAQ.