Load files from S3 using Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in S3.

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Requirements

Databricks Runtime 7.2 or above.

If you created streams using Databricks Runtime 7.1 or below, see Changes in default option values and compatibility and Cloud resource management.

New file detection modes

Auto Loader supports two modes for detecting when there are new files: directory listing and file notification.

  • Directory listing: Identifies new files by parallel listing of the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configuration and is suitable for scenarios where only a few files need to be streamed in on a regular basis. Directory listing mode is the default for Auto Loader in Databricks Runtime 7.2 and above.
  • File notification: Uses AWS SNS and SQS services that subscribe to file events from the input directory. Auto Loader automatically sets up the AWS SNS and SQS services. File notification mode is more performant and scalable for large input directories. To use this mode, you must configure permissions for the AWS SNS and SQS services and specify .option("cloudFiles.useNotifications","true").

You can change mode when you restart the stream. For example, you may want to switch to file notification mode when the directory listing is getting too slow due to the increase in input directory size. For both modes, Auto Loader internally keeps tracks of what files have been processed to provide exactly-once semantics, so you do not need to manage any state information yourself.

Use cloudFiles source

You use a cloudFiles source in the same way as other streaming sources:

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .start(<output-path>)
val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .start(<output-path>)

where:

  • <cloudFiles-option> is a configuration option in Configuration.

  • <schema> is the file schema.

    Note

    On Databricks Runtime 7.3 LTS and above, if the file format is text or binaryFile you don’t need to provide the schema.

  • <input-path> is the path in S3 that is monitored for new files. Subdirectories of <input-path> are also monitored. <input-path> can contain file glob patterns.

  • <checkpoint-path> is the output stream checkpoint location.

  • <output-path> is the output stream path.

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

Important

Some default option values changed in Databricks Runtime 7.2. If you are using Auto Loader on Databricks Runtime 7.1 or below, see Changes in default option values and compatibility.

Option Type Default Description
cloudFiles. allowOverwrite Boolean false Whether to allow input directory file changes to overwrite existing data. Available in Databricks Runtime 7.6 and above.
cloudFiles.format String None (required option) The data file format in the source path.
cloudFiles. includeExistingFiles Boolean true Whether to include existing files in the input path in the streaming processing versus only processing new files arrived after setting up the notifications. This option is respected only when you start a stream for the first time. Changing its value at stream restart won’t take any effect. This option has no effect in directory listing mode.
cloudFiles. maxFilesPerTrigger Integer 1000 Maximum number of new files to be processed in every trigger. This option has no effect when used with Trigger.Once().
cloudFiles. maxBytesPerTrigger Byte String (e.g., 10g) None Maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each micro-batch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a micro-batch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().
cloudFiles. useNotifications Boolean false Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See New file detection modes.
cloudFiles. validateOptions Boolean true Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Provide one of the following authentication options only if you choose cloudFiles.useNotifications = true:

Authentication Option Type Default Description
cloudFiles.region String None The region where the source S3 bucket resides and where the AWS SNS and SQS services are created.
cloudFiles.queueUrl String None The URL of the SQS queue. If provided, the cloud files source directly consumes events from this queue instead of setting up its own AWS SNS and SQS services.

Changes in default option values and compatibility

The default values of the following Auto Loader options changed in Databricks Runtime 7.2 to the values listed in Configuration.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Auto Loader streams started on Databricks Runtime 7.1 and below have the following default option values:

  • cloudFiles.useNotifications is true
  • cloudFiles.includeExistingFiles is false
  • cloudFiles.validateOptions is false

To ensure compatibility with existing applications, these default option values do not change when you run your existing Auto Loader streams on Databricks Runtime 7.2 or above; the streams will have the same behavior after the upgrade.

Metrics

Auto Loader reports metrics at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [ {
    "description" : "CloudFilesSource[/path/to/source]",
    "metrics" : {
      "numFilesOutstanding" : "238",
      "numBytesOutstanding" : "163939124006"
    },
  } ]
}

Permissions

To use file notification mode, attach the following JSON policy document to your IAM user or role.

If you are unable to set up the permissions specified in the JSON policy document, you can optionally ask an administrator to perform setup for you using the Cloud resource management Scala API. An administrator can provide you with the queue URL, which you can directly provide as .option("queueUrl", <queue-url>) to the cloudFiles source. With this configuration, you need only reduced permissions. See Appendix: Reduced permissions after initial setup for details.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DatabricksAutoLoaderSetup",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketNotification",
                "s3:PutBucketNotification",
                "sns:ListSubscriptionsByTopic",
                "sns:GetTopicAttributes",
                "sns:SetTopicAttributes",
                "sns:CreateTopic",
                "sns:TagResource",
                "sns:Publish",
                "sns:Subscribe",
                "sqs:CreateQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:GetQueueAttributes",
                "sqs:SetQueueAttributes",
                "sqs:TagQueue",
                "sqs:ChangeMessageVisibility",
                "sqs:ChangeMessageVisibilityBatch"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket-name>",
                "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
                "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
            ]
        },
        {
            "Sid": "DatabricksAutoLoaderList",
            "Effect": "Allow",
            "Action": [
                "sqs:ListQueues",
                "sqs:ListQueueTags",
                "sns:ListTopics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DatabricksAutoLoaderTeardown",
            "Effect": "Allow",
            "Action": [
                "sns:Unsubscribe",
                "sns:DeleteTopic",
                "sqs:DeleteQueue"
            ],
            "Resource": [
                "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
                "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
            ]
        }
    ]
}

where:

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running %fs mounts.
  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. If you don’t want to specify the region, use *.
  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. If don’t want to specify the account number, use *.

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since Databricks sets up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it). See Appendix: Reduced permissions after initial setup for details.

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS, and SQS services and assumes you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the DatabricksAutoLoaderSetup statement in the JSON document:

  • s3:ListBucket
  • s3:GetObject

Cloud resource management

You can use a Scala API to manage the AWS SNS and SQS services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

Important

If you have used Auto Loader in Databricks Runtime 7.1 and below, update your IAM policy using the JSON policy document in Permissions. There are new statements in the policy for Databricks Runtime 7.2—DatabricksAutoLoaderList and DatabricksAutoLoaderTeardown—which specify the additional permissions required by the Scala API.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
  .newManager
  .option("cloudFiles.region", <region>)
  .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
  .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Note

Available in Databricks Runtime 7.4 and above.

Use setUpNotificationServices(<resource-suffix>) to create an SQS queue and an SNS topic with the name <resource-prefix><resource-suffix>. If there is an existing SQS queue or SNS topic with the same name, Databricks reuses the resource that already exists instead of creating a new one. This function returns an SQS queue that you can pass to the cloudFiles source using .option("cloudFiles.queueUrl", <queue-url>). This enables the cloudFiles source user to have fewer permissions than the user who creates the resources. See Permissions.

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Frequently asked questions (FAQ)

Do I need to create AWS event notification services beforehand?

No. If you choose file notification mode, Auto Loader creates an S3 > SNS topic > SQS queue file event notification pipeline automatically when you start the stream.

How do I clean up the event notification resources, such as SNS topics and SQS queues, created by Auto Loader?

You can use the cloud resource manager to list and tear down resources. You can also delete these resources manually, either in the Web Console or using AWS APIs. All resources created by Auto Loader have the prefix: <resource-prefix>.

Does Auto Loader process the file again when the file gets appended or overwritten?

No. Files are processed exactly once. If a file is appended to or overwritten, Databricks does not guarantee which version of the file is processed. For well defined behavior, we suggest that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

Can I run multiple streaming queries from the same input directory?

Yes. Each cloud files stream, as identified by a unique checkpoint directory, has its own SQS queue, and the same S3 events can be sent to multiple SQS queues.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. The first run sets up the event notification services, which will be always on, even when the streaming cluster is down. When you restart the stream, the cloudFiles source fetches and processes all files events backed up in the SQS queue. The benefit of using Auto Loader for this case is that you don’t need to determine which files are new and to be processed each time, which can be very expensive.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream. The new stream will create new progress information and if you are using file notification mode, new AWS SNS and SQS services. You must manually clean up the checkpoint location and AWS SNS and SQS services for any abandoned streams.

Can I run multiple streaming queries from different input directories on the same S3 bucket?

Yes, as long as they are not parent-child directories, for example, prod-logs/ and prod-logs/usage/.

Can I use this feature when there are existing file notifications on my S3 bucket?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).

Appendix: Reduced permissions after initial setup

The resource setup permissions described in Permissions are required only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions.

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted); you also won’t be able to use the cloud resource management API to list or tear down resources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:s3:::<bucket-name>"
      ]
    }
  ]
}