Monitor and observe Auto Loader
Auto Loader pipelines require active monitoring to detect issues like growing backlogs, schema drift, corrupt data, and stalled streams before they affect downstream consumers. This page describes how to monitor key metrics, query file-level state, build observability dashboards, and troubleshoot common issues.
For production configuration details, see Configure Auto Loader for production workloads.
Prerequisites
Several monitoring workflows on this page rely on cloud_files_state() to observe per-file ingestion state — including backlog queries, latency calculations, and schema drift detection. cloud_files_state() is a table-valued function that returns file-level ingestion state for an Auto Loader checkpoint. Not all of its fields are available by default. Availability depends on your Databricks Runtime version and configuration:
- Databricks Runtime 18.2 and above:
discovery_time,processed_time, andcommit_timeare available automatically. On Databricks Runtime 16.4–18.1, these fields are available only whencloudFiles.cleanSourceis enabled. - Databricks Runtime 16.4 and above with
cloudFiles.cleanSourceenabled:archive_time,archive_mode, andmove_locationare available.
Enabling cloudFiles.cleanSource has some performance overhead. Benchmark against your workloads in a pre-production environment before enabling it in production.
Additionally:
- Annotate ingested data with the
_metadatacolumn. Capture at minimumfile_pathandfile_modification_time. See File metadata column. - Enable
_rescued_dataand_corrupt_recordcolumns.
Key Auto Loader metrics
The following table summarizes the most important metrics to monitor for Auto Loader pipelines. These metrics are available from StreamingQueryListener progress events, with Auto Loader-specific values exposed under each source's metrics map.
Metric | What it tells you |
|---|---|
| Number of files in the backlog waiting to be processed |
| Size of the file backlog in bytes |
| Cloud queue depth (file notification mode only) |
| Rows processed per batch |
| Data arrival rate |
| Processing throughput |
| Where time is spent in each batch |
What to watch for
The following patterns indicate your pipeline may need attention.
- Growing
numFilesOutstanding: The backlog is building up. Your pipeline is falling behind incoming data. processedRowsPerSecond<inputRowsPerSecond: The pipeline is processing data slower than it arrives.- Large
durationMs.latestOffset: File discovery is slow. Consider switching to file events. - Large
durationMs.addBatch: Data processing is slow. Consider scaling compute or optimizing transformations.
For the full metrics reference, see Auto Loader source metrics.
Query file-level state with cloud_files_state
The cloud_files_state() table-valued function provides detailed information about each file discovered by Auto Loader. The following fields are available. Fields marked as requiring Databricks Runtime 16.4 and above or 18.2 and above are only populated under the conditions described in Prerequisites.
Field | Type | Description |
|---|---|---|
|
| The path of the file |
|
| The size of the file in bytes |
|
| When the file was created |
|
| When Auto Loader discovered the file (Databricks Runtime 16.4 and above) |
|
| When Auto Loader processed the file (Databricks Runtime 16.4 and above) |
|
| When the file was committed to the checkpoint (Databricks Runtime 16.4 and above) |
|
| When the file was archived (requires |
|
|
|
|
| Destination path when |
|
| Current file ingestion state |
Investigate file ingestion state
The following queries cover common diagnostic scenarios.
Find all unprocessed files (the current backlog):
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
Compute average ingestion latency (time from file creation to commit):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
Find corrupted or skipped files:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
Track archival progress (requires cloudFiles.cleanSource):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
Find files with high discovery-to-commit latency to identify bottlenecks:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
For the full SQL reference, see cloud_files_state table-valued function.
Monitor Auto Loader in Lakeflow Spark Declarative Pipelines
Databricks recommends using Lakeflow Spark Declarative Pipelines for production Auto Loader pipelines. To take advantage of its built-in monitoring capabilities:
-
Store the Lakeflow Spark Declarative Pipelines event log in a Delta table so it can be queried for observability data. Configure this through the pipeline's advanced settings or the API. For details, see Pipeline event log.
-
Structure your pipeline for observability. A well-structured Auto Loader pipeline in Lakeflow Spark Declarative Pipelines includes a
{table}_sourceview (the Auto Loader source definition), a{table}_bronzestreaming table (raw data ingestion with_rescued_dataand_corrupt_recordcolumns), acorrupt_records_sinkthat quarantines rows with unparsable data, and a{table}clean view for downstream consumption. -
Set expectations on your bronze streaming tables to monitor for schema drift and data corruption.
_rescued_data IS NULLdetects unexpected schema changes and_corrupt_record IS NULLdetects unparsable data. Lakeflow Spark Declarative Pipelines evaluates these expectations as data arrives and generates an observability trail. You can configure expectations to warn, drop rows, or fail the pipeline.
After creating the event_log_raw view for your pipeline, use the following queries for Auto Loader-specific metrics.
Monitor ingestion throughput per flow:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
Monitor data backlog per flow:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
Summarize expectation violations to detect schema drift and corrupt data:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
For general Lakeflow Spark Declarative Pipelines monitoring guidance, see Monitor pipelines and Pipeline event log.
Monitor Auto Loader with Structured Streaming
When running Auto Loader outside of Lakeflow Spark Declarative Pipelines, use the following Structured Streaming monitoring approaches.
- Implement a
StreamingQueryListenerto capture Auto Loader-specific metrics from each batch by reading fromsource.metrics.
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
Processing logic in listeners can slow down query processing. Limit computation in listener callbacks and avoid synchronous external writes there; instead emit lightweight telemetry asynchronously or hand metrics off to a separate job for persistence.
-
Use
numInputRows,inputRowsPerSecond, andprocessedRowsPerSecondfrom the source progress to compute throughput — files per second and rows per second for each batch. -
To compute ingestion latency, compare
create_timeandcommit_timefromcloud_files_state()for end-to-end latency. For processing latency, use thedurationMsbreakdown (for example,latestOffset,addBatch, and other reported batch phases) to identify which stage is the bottleneck. -
Use
df.observe()to define inline data quality metrics directly on the streaming DataFrame. Metrics are visible inStreamingQueryListenerprogress events underobservedMetrics.
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
- Use
.queryName()to assign a unique name to each stream, making it easier to distinguish Auto Loader streams in the Spark UI Streaming tab and in monitoring dashboards.
For the full Structured Streaming monitoring reference, see Monitoring Structured Streaming queries on Databricks.
Build an observability dashboard
Combine data from multiple sources to build a comprehensive observability dashboard for your Auto Loader pipelines. This table displays some suggested sources you can use to structure your observability dashboard.
Data source | Observability data |
|---|---|
| File-level ingestion state: discovery, processing, commit, and archival timestamps per file |
Lakeflow Spark Declarative Pipelines event log | Pipeline run history, per-batch flow metrics, and data quality expectation results |
Pipeline output tables | Row counts and data volume written per ingested table |
You can then aggregate observability data into dedicated tables that serve as the foundation for dashboards and alerts:
- Summarize pipeline run statuses (success or failure) over time, derived from
event_type = 'update_progress'events. - Aggregate file ingestion metrics (backlog size, throughput, latency per batch), derived from
cloud_files_state()andevent_type = 'flow_progress'events. - Develop table statistics using row counts and data volume per table, derived from
num_output_rowsin the event log. - Collect debugging info from detailed error logs and expectation violations per update, derived from
event_type = 'flow_progress'events withdata_qualitypopulated.
These aggregated tables can power an AI/BI dashboard and SQL alerts. Recommended dashboard panels include pipeline run status timeline, ingestion backlog trend, throughput trend, ingestion latency distribution, data quality metrics, schema evolution events, and file archival status.
Monitor schema evolution events
Use the following approaches to detect schema changes as they occur.
- Non-NULL values in
_rescued_datain the expectation violation counts indicate schema drift. Query the event log forfailed_records > 0on theno rescued dataexpectation. - Changes to the
_schemasdirectory inside the configuredcloudFiles.schemaLocation(or inside the checkpoint only when schema location is not set separately) indicate that schema evolution has occurred. You can poll this directory from a separate monitoring job. - Do not treat an
onQueryTerminatedevent followed byonQueryStartedfor the same stream name as sufficient evidence of schema evolution on its own. Streams restart for many reasons (cluster restarts, code deploys, transient storage errors). Correlate restarts with independent signals —_schemasdirectory changes or_rescued_dataexpectation violations — before concluding that schema evolution occurred. - Use
_metadata.file_pathto identify which files introduced schema changes. Join this withcloud_files_state()on thepathfield to correlate schema changes with specific files and batches.
Use this example query to detect recent schema drift via expectation violations:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
Set up alerts for common issues
Use Databricks SQL alerts or pipeline notifications to detect problems before they affect downstream consumers.
The following SQL detects a growing backlog and can be used as the basis for a Databricks SQL alert. Schedule it to run periodically (for example, every 5 minutes) and alert when the result is non-empty.
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
The following table summarizes recommended alert conditions:
What to detect | How to detect it | When to alert |
|---|---|---|
Growing backlog |
| Sustained increase over multiple batches |
Stalled stream | No progress events | No events for N minutes (based on expected trigger interval) |
High ingestion latency |
| Exceeds your SLA threshold |
Data quality degradation | Expectation failure rate | Increasing percentage of rows failing expectations |
Schema evolution event |
| Any non-NULL values in the expectation violation count |
Slow file discovery |
| Significantly higher than baseline |
Troubleshoot common issues
The following table describes common Auto Loader pipeline issues, their likely causes, and recommended actions to resolve them.
Issue | Possible cause | Recommended action |
|---|---|---|
Backlog growing faster than processing | Undersized compute, data skew, or throttled rate limits | Scale compute, check for skew with the Spark UI, and review |
Files not being discovered | File events misconfigured, permissions issue, or stream not run within 7 days | Verify external location permissions, check file events setup in the Unity Catalog UI, and ensure the stream runs at least every 7 days to avoid RocksDB state expiry |
Stream startup taking too long | Large checkpoint state download (RocksDB) | Upgrade to Databricks Runtime 15.3 and above for async state loading, which reduces startup time by ~90% |
Duplicate file processing | Aggressive | Use a conservative |
Schema evolution causing pipeline restarts | Frequent or incompatible schema changes | Review |
Corrupt data accumulating in sink | Source data quality issues | Check the |
| Running on Databricks Runtime below 18.2 without | Upgrade to Databricks Runtime 18.2 and above or enable |
For additional troubleshooting, see Auto Loader FAQ.