Configure Auto Loader for production workloads

Databricks recommends that you follow the streaming best practices for running Auto Loader in production.

Databricks recommends using Auto Loader in Delta Live Tables for incremental data ingestion. Delta Live Tables extends functionality in Apache Spark Structured Streaming and allows you to write just a few lines of declarative Python or SQL to deploy a production-quality data pipeline with:

Monitoring Auto Loader

Querying files discovered by Auto Loader

Note

The cloud_files_state function is available in Databricks Runtime 10.5 and above.

Auto Loader provides a SQL API for inspecting the state of a stream. Using the cloud_files_state function, you can find metadata about files that have been discovered by an Auto Loader stream. Simply query from cloud_files_state, providing the checkpoint location associated with an Auto Loader stream.

SELECT * FROM cloud_files_state('path/to/checkpoint');

Listen to stream updates

To further monitor Auto Loader streams, Databricks recommends using Apache Spark’s Streaming Query Listener interface.

Auto Loader reports metrics to the Streaming Query Listener at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

In Databricks Runtime 10.1 and later, when using file notification mode, the metrics will also include the approximate number of file events that are in the cloud queue as approximateQueueSize for AWS and Azure.

Cost considerations

When running Auto Loader, your main source of costs would be the cost of compute resources and file discovery.

To reduce compute costs, Databricks recommends using Databricks Jobs to schedule Auto Loader as batch jobs using Trigger.AvailableNow (in Databricks Runtime 10.1 and later) or Trigger.Once instead of running it continuously as long as you don’t have low latency requirements.

File discovery costs can come in the form of LIST operations on your storage accounts in directory listing mode and API requests on the subscription service, and queue service in file notification mode. To reduce file discovery costs, Databricks recommends:

  • Providing a ProcessingTime trigger when running Auto Loader continuously in directory listing mode

  • Architecting file uploads to your storage account in lexical ordering to leverage Incremental Listing when possible

  • Using Databricks Runtime 9.0 or later in directory listing mode, especially for deeply nested directories

  • Leveraging file notifications when incremental listing is not possible

  • Using resource tags to tag resources created by Auto Loader to track your costs

Using Trigger.AvailableNow and rate limiting

Note

Available in Databricks Runtime 10.1 for Scala only.

Available in Databricks Runtime 10.2 and above for Python and Scala.

Auto Loader can be scheduled to run in Databricks Jobs as a batch job by using Trigger.AvailableNow. The AvailableNow trigger will instruct Auto Loader to process all files that arrived before the query start time. New files that are uploaded after the stream has started will be ignored until the next trigger.

With Trigger.AvailableNow, file discovery will happen asynchronously with data processing and data can be processed across multiple micro-batches with rate limiting. Auto Loader by default processes a maximum of 1000 files every micro-batch. You can configure cloudFiles.maxFilesPerTrigger and cloudFiles.maxBytesPerTrigger to configure how many files or how many bytes should be processed in a micro-batch. The file limit is a hard limit but the byte limit is a soft limit, meaning that more bytes can be processed than the provided maxBytesPerTrigger. When the options are both provided together, Auto Loader will process as many files that are needed to hit one of the limits.

Event retention

Note

Available in Databricks Runtime 8.4 and above.

Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to provide exactly-once ingestion guarantees. For high volume datasets, you can use the cloudFiles.maxFileAge option to expire events from the checkpoint location to reduce your storage costs and Auto Loader start up time. The minimum value that you can set for cloudFiles.maxFileAge is "14 days". Deletes in RocksDB appear as tombstone entries, therefore you should expect the storage usage to increase temporarily as events expire before it starts to level off.

Warning

cloudFiles.maxFileAge is provided as a cost control mechanism for high volume datasets, ingesting in the order of millions of files every hour. Tuning cloudFiles.maxFileAge incorrectly can lead to data quality issues. Therefore, Databricks doesn’t recommend tuning this parameter unless absolutely required.

Trying to tune the cloudFiles.maxFileAge option can lead to unprocessed files being ignored by Auto Loader or already processed files expiring and then being re-processed causing duplicate data. Here are some things to consider when choosing a cloudFiles.maxFileAge:

  • If your stream restarts after a long time, file notification events that are pulled from the queue that are older than cloudFiles.maxFileAge are ignored. Similarly, if you use directory listing, files that may have appeared during the down time that are older than cloudFiles.maxFileAge are ignored.

  • If you use directory listing mode and use cloudFiles.maxFileAge, for example set to "1 month", you stop your stream and restart the stream with cloudFiles.maxFileAge set to "2 months", all files that are older than 1 month, but more recent than 2 months are reprocessed.

The best approach to tuning cloudFiles.maxFileAge would be to start from a generous expiration, for example, "1 year" and working downwards to something like "9 months". If you set this option the first time you start the stream, you will not ingest data older than cloudFiles.maxFileAge, therefore, if you want to ingest old data you should not set this option as you start your stream.