Load files from Google Cloud Storage (GCS) using Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in GCS (gs://).

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Auto Loader works with DBFS paths as well as direct paths to the data source.

File discovery modes

Auto Loader for Google Cloud supports two modes for detecting when there are new files: directory listing and file notification.

Note

File notification mode for Google Cloud is in Public Preview. It is supported in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

  • Directory listing: Identifies new files by listing the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configurations other than access to your data on GCS and is suitable for scenarios where only a few files need to be streamed in on a regular basis.
  • File notification: Currently in Public Preview. Uses GCS Notifications and Google Cloud Pub/Sub services that subscribe to file events from the input directory. Auto Loader automatically sets up the GCS Notifications and Google Cloud Pub/Sub services. File notification mode is more performant and scalable for large input directories. To use this mode, you must configure permissions for the GCS input bucket and PubSub service accounts and specify .option("cloudFiles.useNotifications", "true").

You can change the mode when you restart the stream. For example, you may want to switch to file notification mode when the directory listing is getting too slow due to the increase in input directory size. For both modes, Auto Loader internally keeps tracks of what files have been processed in your streaming checkpoint location to provide exactly-once semantics, so you do not need to manage any state information yourself.

Use cloudFiles source

To use the Auto Loader, create a cloudFiles source in the same way as other streaming sources. The code below will start an Auto Loader stream writing to Delta Lake in directory listing mode:

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)
val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

where:

  • <cloudFiles-option> is a configuration option in Configuration.
  • <schema> is the file schema. Auto Loader also supports schema inference and evolution with some file formats. See Schema inference and evolution for more details
  • <input-path> is the path in storage that is monitored for new files. Child directories of <input-path> are also monitored. <input-path> can contain file glob patterns. The glob pattern will have * appended to it; if this includes files you don’t want to ingest you can include an additional filter through the pathGlobFilter option. If you are providing a queue for file notifications and don’t need to backfill any data, you don’t need to provide an input path.
  • <checkpoint-path> is the stream checkpoint location.
  • <trigger> An optional trigger for the stream. The default is to execute the next micro-batch as quickly as possible. If you have data arriving at a regular interval, for example once a day, you can use Trigger Once and schedule the execution of your streams in a Databricks job. For always on streams, Databricks recommends that you set a processing time trigger.
  • <output-path> is the output stream path.

Benefits over Apache Spark FileStreamSource

In Apache Spark, you can read files incrementally using spark.readStream.format(fileFormat).load(directory). Auto Loader provides the following benefits over the file source:

  • Scalability: Auto Loader can discover billions of files efficiently. Backfills can be performed asynchronously to avoid wasting any compute resources.
  • Performance: The cost of discovering files with Auto Loader scales with the number of files that are being ingested instead of the number of directories that the files may land in. See Optimized directory listing.
  • Schema inference and evolution support: Auto Loader can detect schema drifts, notify you when schema changes happen, and rescue data that would have been otherwise ignored or lost. See Schema inference and evolution.
  • Cost: Auto Loader uses native cloud APIs to get lists of files that exist in storage. In addition, Auto Loader’s file notification mode can help reduce your cloud costs further by avoiding directory listing altogether. Auto Loader can automatically set up file notification services on storage to make file discovery much cheaper.

Optimized directory listing

Note

Available in Databricks Runtime 9.0 and above.

Auto Loader can discover files on cloud storage systems using directory listing more efficiently than other alternatives. For example, if you had files being uploaded every 5 minutes as /some/path/YYYY/MM/DD/HH/fileName, to find all the files in these directories, the Apache Spark file source would list all subdirectories in parallel, causing 1 (base directory) + 365 (per day) * 24 (per hour) = 8761 LIST API directory calls to storage. By receiving a flattened response from storage, Auto Loader reduces the number of API calls to the number of files in storage divided by the number of results returned by each API call, greatly reducing your cloud costs.

Incremental Listing

Preview

This feature is in Public Preview.

For lexicographically generated files, Auto Loader now can leverage the lexical file ordering and existing optimized APIs to improve the efficiency of directory listing by listing from previous ingested files rather than listing the entire directory.

By default, Auto Loader will automatically detect whether a given directory is applicable for the incremental listing by checking and comparing file paths of previous completed full directory listings. To ensure eventual completeness in this auto mode, Auto Loader will automatically trigger the full directory listing after completing 7 consecutive incremental listings. If you want to be more frequent or less frequent, you can set cloudFiles.backfillInterval to trigger asynchronous backfills at a given interval.

If you have confidence in the order of files generated in the directory, you can explicitly turn on or off the incremental listing mode by setting cloudFiles.useIncrementalListing to true or false (default auto), e.g., files that are ordered by date=... partitions can be considered lexically ordered if data is processed once a day, file paths containing timestamps can be considered lexically ordered. You can always use cloudFiles.backfillInterval to ensure that all data is ingested when you turn on the incremental listing.

Schema inference and evolution

Note

Available in Databricks Runtime 8.2 and above.

Auto Loader supports schema inference and evolution with CSV, JSON, binary (binaryFile), and text file formats. See Schema inference and evolution in Auto Loader for details.

Run Auto Loader in production

Databricks recommends that you follow the streaming best practices for running Auto Loader in production.

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

File format options

With Auto Loader you can ingest JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC files. See Format options for the options for these file formats.

Common Auto Loader options

You can configure the following options for directory listing or file notification mode.

Option

cloudFiles.allowOverwrites

Type: Boolean

Whether to allow input directory file changes to overwrite existing data. Available in Databricks Runtime 7.6 and above.

Default value: true

cloudFiles.format

Type: String

The data file format in the source path. Allowed values include:

Default value: None (required option)

cloudFiles.includeExistingFiles

Type: Boolean

Whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. This option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect.

Default value: true

cloudFiles.inferColumnTypes

Type: Boolean

Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON datasets. See schema inference for more details.

Default value: false

cloudFiles.maxBytesPerTrigger

Type: Byte String

The maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each microbatch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a microbatch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().

Default value: None

cloudFiles.maxFileAge

Type: Interval String

How long a file event is tracked for deduplication purposes. Databricks does not recommend tuning this parameter unless you are ingesting data at the order of millions of files an hour. See the section on How to choose maxFileAge for more details.

Default value: None

cloudFiles.resourceTags

Type: Map(String, String)

A series of key-value tag pairs to help associate and identify related resources, for example:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")           .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

For more information, see Amazon SQS cost allocation tags and Configuring tags for an Amazon SNS topic. (1)

Default value: None

cloudFiles.schemaEvolutionMode

Type: String

The mode for evolving the schema as new columns are discovered in the data. By default, columns are inferred as strings when inferring JSON datasets. See schema evolution for more details.

Default value: "addNewColumns" when a schema is not provided. "none" otherwise.

cloudFiles.schemaHints

Type: String

Schema information that you provide to Auto Loader during schema inference. See schema hints for more details.

Default value: None

cloudFiles.schemaLocation

Type: String

The location to store the inferred schema and subsequent changes. See schema inference for more details.

Default value: None (required when inferring the schema)

cloudFiles.validateOptions

Type: Boolean

Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Default value: true

cloudFiles.backfillInterval

Preview

This feature is in Public Preview.

Type: Interval String

Auto Loader can trigger asynchronous backfills at a given interval, e.g. 1 day to backfill once a day, or 1 week to backfill once a week. File event notification systems do not guarantee 100% delivery of all files that have been uploaded therefore you can use backfills to guarantee that all files eventually get processed, available in Databricks Runtime 8.4 and Databricks Runtime 8.4 Photon and above. If using the incremental listing, you can also use regular backfills to guarantee the eventual completeness, available in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

Default value: None

(1) Auto Loader adds the following key-value tag pairs by default on a best-effort basis:

  • vendor: Databricks
  • path: The location from where the data is loaded.
  • checkpointLocation: The location of the stream’s checkpoint.
  • streamId: A globally unique identifier for the stream.

These key names are reserved and you cannot overwrite their values.

Directory Listing options

The following options are relevant to directory listing mode.

Option

cloudFiles.useIncrementalListing

Preview

This feature is in Public Preview.

Type: String

Whether to use the incremental listing rather than the full listing in directory listing mode. By default, Auto Loader will make the best effort to automatically detect if a given directory is applicable for the incremental listing. You can explicitly use the incremental listing or use the full directory listing by setting it as true or false respectively.

Available in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

Default value: auto

Available values: auto, true, false

How to choose maxFileAge

Note

Available in Databricks Runtime 8.4 and above.

Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to provide exactly-once ingestion guarantees. For high volume datasets, you can use the maxFileAge option to expire events from the checkpoint location. The minimum value that you can set for maxFileAge is "14 days". Deletes in RocksDB appear as tombstone entries, therefore you should expect the storage usage to increase as events expire before it starts to level off.

Warning

maxFileAge is provided as a cost control mechanism for high volume datasets, ingesting in the order of millions of files every hour. Tuning maxFileAge incorrectly can lead to data quality issues. Therefore, Databricks doesn’t recommend tuning this parameter unless absolutely required.

Trying to tune the maxFileAge option can lead to unprocessed files being ignored by Auto Loader or already processed files expiring and then being re-processed causing duplicate data. Here are some things to consider when choosing a maxFileAge:

  • If your stream restarts after a long time, file notification events that are pulled from the queue that are older than maxFileAge are ignored. Similarly, if you use directory listing, files that may have appeared during the down time that are older than maxFileAge are ignored.
  • If you use directory listing mode and use maxFileAge, for example set to "1 month", you stop your stream and restart the stream with maxFileAge set to "2 months", all files that are older than 1 month, but more recent than 2 months are reprocessed.

The best approach to tuning maxFileAge would be to start from a generous expiration, for example, "1 year" and working downwards to something like "9 months". If you set this option the first time you start the stream, you will not ingest data older than maxFileAge, therefore, if you want to ingest old data you should not set this option as you start your stream.

Metrics

Auto Loader reports metrics at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

File notification options

The following options are relevant to file notification mode.

Option

cloudFiles.backfillInterval

Type: Interval String

Auto Loader can trigger asynchronous backfills at a given interval, for example 1 day to backfill once a day, or 1 week to backfill once a week. File event notification systems do not guarantee delivery of all files that have been uploaded, therefore you can use backfills to guarantee that all files eventually get processed.

Default value: None

cloudFiles.fetchParallelism

Type: Integer

Number of threads to use when fetching messages from the queueing service.

Default value: 1

cloudFiles.subscription

Type: String

The name of the Google Cloud Pub/Sub subscription. If provided, the cloud files source consumes events from this queue instead of setting up its own GCS Notification and Google Cloud Pub/Sub services.

Default value: None

cloudFiles.useNotifications

Type: Boolean

Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See File discovery modes.

Default value: false

Auto Loader can automatically set up notification services for you by leveraging Google Service Accounts. You can configure your cluster to assume a service account by following Google service setup. The permissions that your service account needs are specified in Permissions. Otherwise, you can provide the following options for authentication if you want Auto Loader to set up the notification services for you.

Option

cloudFiles.client

Type: String

The client ID of the Google Service Account.

Default value: None

cloudFiles.clientEmail

Type: String

The email of the Google Service Account.

Default value: None

cloudFiles.privateKey

Type: String

The private key that’s generated for the Google Service Account.

Default value: None

cloudFiles.privateKeyId

Type: String

The id of the private key that’s generated for the Google Service Account.

Default value: None

cloudFiles.projectId

Type: String

The id of the project that the GCS bucket is in. The Google Cloud Pub/Sub subscription will also be created within this project.

Default value: None

Permissions

You must have list and get permissions on your GCS bucket and on all the objects. For details, see the Google documentation on IAM permissions.

To use file notification mode (in Public Preview), you need to add permissions for both the GCS service account as well as the account being used to access the Google Cloud Pub/Sub resources.

Add the Pub/Sub Publisher role to the GCS service account. This will allow the account to publish event notification messages from your GCS buckets to Google Cloud Pub/Sub.

As for the service account used for the Google Cloud Pub/Sub resources, you will need to add the following permissions:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

To do this, you can either create an IAM custom role with these permissions or assign pre-existing GCP roles to cover these permissions.

Finding the GCS Service Account

In the Google Cloud Console for the corresponding project, navigate to Cloud Storage > Settings. On that page, you should see a section titled “Cloud Storage Service Account” containing the email of the GCS service account.

GCS Service Account

Creating a Custom Google Cloud IAM Role for File Notification Mode

In the Google Cloud console for the corresponding project, navigate to IAM & Admin > Roles. Then, either create a role at the top or update an existing role. In the role create or edit screen, click Add Permissions. A menu should then pop up in which you can add the desired permissios to the role.

GCP IAM Custom Roles

Cloud resource management

You can use a Scala API to manage the Google Cloud notification and Google Cloud Pub/Sub services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Sets up a Pub/Sub topic and subscription. Creates a GCS notification on the bucket and attaches it to the created topic.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Note

The CloudFilesGCPResourceManager is available in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

Use setUpNotificationServices(<resource-suffix>) to create:

  • a Google Cloud Pub/Sub topic with the name projects/<project>/topics/databricks-auto-ingest-<resource-suffix>
  • a Google Cloud Pub/Sub subscription with the name projects/<project>/subscriptions/databricks-auto-ingest-<resource-suffix>
  • a GCS notification configuration attached to the specified bucket and path

If there is an existing Google Cloud Pub/Sub topic or subscription with the same name, Databricks reuses the resource that already exists instead of creating a new one. If there is a single existing bucket notification pointing to the Databricks topic name, Databricks reuses it. If there are multiple notifications, however, you will need to delete all notification configurations on the bucket pointing to the topic to avoid unwanted events. This function returns a result containing a Google Cloud Pub/Sub subscription that you can pass to the cloudFiles source using .option("cloudFiles.subscription", <subscription>).

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Frequently asked questions (FAQ)

Does Auto Loader process the file again when the file gets appended or overwritten?

Files are processed exactly once unless cloudFiles.allowOverwrites is enabled. If a file is appended to or overwritten, Databricks does not guarantee which version of the file is processed. For well-defined behavior, Databricks suggests that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. Auto Loader works well with both infrequent or frequent updates. Even if the eventual updates are very large, Auto Loader scales well to the input size.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream.

Do I need to create Google Cloud event notification services beforehand?

No. If you choose file notification mode, Auto Loader creates a GCS notification config > GCP PubSub Topic > GCP PubSub Subscription file event notification pipeline automatically when you start the stream.

How do I clean up the event notification resources, such as Google Cloud Pub/Sub topics and subscriptions, created by Auto Loader?

You can use the cloud resource manager to list and tear down resources. You can also delete these resources manually, either in the Google Cloud Console or using Google Cloud APIs. All Google Cloud Pub/Sub resources created by Auto Loader have the prefix: databricks-auto-ingest-. GCS notification configs pertaining to Auto Loader will be pointing to an Auto Loader topic with that prefix.

Can I run multiple streaming queries from different input directories on the same GCS bucket?

Yes, as long as they are not parent-child directories; for example, prod-logs/ and prod-logs/usage/ would not work because /usage is a child directory of /prod-logs.

Can I use this feature when there are existing file notifications on my GCS bucket?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).