Load files from S3 using Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in S3 without any additional setup. Auto Loader provides a new Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Requirements

Databricks Runtime 7.2 or above.

If you created streams using Databricks Runtime 7.1 or below, see Changes in default option values and compatibility and Cloud resource management.

New file detection modes

Auto Loader supports two modes for detecting when there are new files:

  • Directory listing: Identify new files by parallel listing of the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configuration and is suitable for scenarios where only a few files need to be streamed in on a regular basis. Directory listing mode is the default for Auto Loader in Databricks Runtime 7.2 and above.
  • File notification: Use AWS SNS and SQS services that subscribe to file events from the input directory. Auto Loader automatically sets up the AWS SNS and SQS services. File notification mode is more performant and scalable for large input directories. To use this mode, you must configure permissions for the AWS SNS and SQS services and specify

    .option("cloudFiles.useNotifications", "true")
    

You can change mode when you restart the stream. For example, you may want to switch to file notification mode when the directory listing is getting too slow due to the increase in input directory size. For both modes, Auto Loader internally keeps tracks of what files have been processed to provide exactly-once semantics, so you do not need to manage any state information yourself.

Use cloudFiles source

You use a cloudFiles source in the same way as other streaming sources:

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .start(<output-path>)
val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .start(<output-path>)

where:

  • <cloudFiles-option> is an option and <option-value> is an option value listed in Configuration.
  • <schema> is the file schema. .. note:: On Databricks Runtime 7.3 LTS and above, if the file format is text or binaryFile you don’t need to provide the schema.
  • <input-path> is the path in S3 that is monitored for new files. Subdirectories of <input-path> are also monitored. <input-path> can contain file glob patterns.
  • <checkpoint-path> is the output stream checkpoint location.
  • <output-path> is the output stream path.

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

Important

Some default option values changed in Databricks Runtime 7.2. If you are using Auto Loader in a Databricks Runtime 7.1 or below, see Changes in default option values and compatibility for details.

Option Type Default Description
cloudFiles.format String None (required option) The data file format in the source path. json, csv, text, parquet, binaryFile and so on.
cloudFiles. includeExistingFiles Boolean true Whether to include existing files in the input path in the streaming processing versus only processing new files arrived after setting up the notifications. This option is respected only when you start a stream for the first time. Changing its value at stream restart won’t take any effect. This option has no effect in directory listing mode.
cloudFiles. maxFilesPerTrigger Integer 1000 Maximum number of new files to be processed in every trigger. This option has no effect when used with Trigger.Once().
cloudFiles. maxBytesPerTrigger Byte String (e.g., 10g) None Maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each micro-batch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a micro-batch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().
cloudFiles. useNotifications Boolean false Whether to use file notification mode to determine when there are new files. If false, use directory listing mode.
cloudFiles. validateOptions Boolean true Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

You must provide one of the following authentication options only if you choose file notification mode (cloudFiles.useNotifications = true):

Authentication Option Type Default Description
cloudFiles.region String None The region where the source S3 bucket resides and where the AWS SNS and SQS services are created.
cloudFiles.queueUrl String None The URL of the SQS queue. If provided, the cloud files source directly consumes events from this queue instead of setting up its own AWS SNS and SQS services.

Changes in default option values and compatibility

The default values of the following Auto Loader options changed in Databricks Runtime 7.2 to the values listed in Configuration.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Auto Loader streams started on Databricks Runtime 7.1 and below have the following default option values:

  • cloudFiles.useNotifications is true
  • cloudFiles.includeExistingFiles is false
  • cloudFiles.validateOptions is false

To ensure compatibility with existing applications, these default option values do not change when you run your existing Auto Loader streams on Databricks Runtime 7.2 or above; the streams will have the same behavior after the upgrade.

Permissions

To use file notification mode, attach the following JSON policy document to your IAM user or role.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DatabricksAutoLoaderSetup",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketNotification",
                "s3:PutBucketNotification",
                "sns:ListSubscriptionsByTopic",
                "sns:GetTopicAttributes",
                "sns:SetTopicAttributes",
                "sns:CreateTopic",
                "sns:TagResource",
                "sns:Publish",
                "sns:Subscribe",
                "sqs:CreateQueue",
                "sqs:DeleteMessage",
                "sqs:DeleteMessageBatch",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:GetQueueAttributes",
                "sqs:SetQueueAttributes",
                "sqs:TagQueue",
                "sqs:ChangeMessageVisibility",
                "sqs:ChangeMessageVisibilityBatch"
            ],
            "Resource": [
                "arn:aws:s3:::<bucket-name>",
                "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
                "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
            ]
        },
        {
            "Sid": "DatabricksAutoLoaderList",
            "Effect": "Allow",
            "Action": [
                "sqs:ListQueues",
                "sqs:ListQueueTags",
                "sns:ListTopics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DatabricksAutoLoaderTeardown",
            "Effect": "Allow",
            "Action": [
                "sns:Unsubscribe",
                "sns:DeleteTopic",
                "sqs:DeleteQueue"
            ],
            "Resource": [
                "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
                "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
            ]
        }
    ]
}

where:

  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. If you don’t want to specify the region, use *.

  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. If don’t want to specify the account number, use *.

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running:

    %fs mounts
    

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since Databricks sets up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it). See Appendix: Reduced permissions after initial setup for details.

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS, and SQS services and assumes you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the DatabricksAutoLoaderSetup statement in the JSON document:

  • s3:ListBucket
  • s3:GetObject

Cloud resource management

You can use a Scala API to manage the AWS SNS and SQS services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

Important

If you have used Auto Loader in Databricks Runtime 7.1 and below, update your IAM policy using the JSON policy document in Permissions. There are new statements in the policy for Databricks Runtime 7.2—DatabricksAutoLoaderList and DatabricksAutoLoaderTeardown—which specify the additional permissions required by the Scala API.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
  .newManager
  .option("cloudFiles.region", <region>)
  .create()

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Frequently asked questions (FAQ)

Do I need to create AWS event notification services beforehand?

No. If you choose file notification mode, Auto Loader creates an S3 > SNS topic > SQS queue file event notification pipeline automatically when you start the stream.

How do I clean up the event notification resources, such as SNS topics and SQS queues, created by Auto Loader?

For now, you must delete these resources manually, either in the Web Console or using APIs. All resources created by Auto Loader have the prefix: databricks-.

Does Auto Loader process the file again when the file gets appended or overwritten?

No. Files are processed exactly once. If a file is appended to or overwritten, Databricks does not guarantee which version of the file is processed. For well defined behavior, we suggest that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

Can I run multiple streaming queries from the same input directory?

Yes. Each cloud files stream, as identified by a unique checkpoint directory, has its own SQS queue, and the same S3 events can be sent to multiple SQS queues.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. The first run sets up the event notification services, which will be always on, even when the streaming cluster is down. When you restart the stream, the cloudFiles source fetches and processes all files events backed up in the SQS queue. The benefit of using Auto Loader for this case is that you don’t need to determine which files are new and to be processed each time, which can be very expensive.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream. The new stream will create new progress information and if you are using file notification mode, new AWS SNS and SQS services. You must manually clean up the checkpoint location and AWS SNS and SQS services for any abandoned streams.

Can I run multiple streaming queries from different input directories on the same S3 bucket?

Yes, as long as they are not parent-child directories, for example, prod-logs/ and prod-logs/usage/.

Can I use this feature when there are existing file notifications on my S3 bucket?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).

Appendix: Reduced permissions after initial setup

The resource setup permissions described in Permissions are required only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions.

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted); you also won’t be able to use the cloud resource management API to list or tear down resources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:s3:::<bucket-name>"
      ]
    }
  ]
}