Load files from S3 using Auto Loader

Preview

This feature is in Public Preview.

Auto Loader incrementally and efficiently processes new data files as they arrive in S3 without any additional setup. Auto Loader provides a new Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically sets up file notification services that subscribe to file events from the input directory and processes new files as they arrive, with the option of also processing existing files in that directory. You can use it in the same way as other streaming sources:

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema(/* your schema here */)
  .load("/input/path")

df.writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .start("/out/path")

The benefits of Auto Loader are that it:

  • Incrementally processes new files as they arrive in S3 by leveraging AWS SNS and SQS services, instead of having to list the directory. This approach is efficient and economical, especially when the input directory contains a huge number of files.
  • Automatically sets up AWS SNS and SQS required for incrementally processing the files, so you do not need to set up these services manually.
  • Internally keeps tracks of what files have been processed to provide exactly-once semantics, so you do not need to manage any state information yourself.

Requirements

Databricks Runtime 6.3 or above.

Permissions

You can attach the following JSON policy document to your IAM user or role.

{
  "Version": "2012-10-17",
  "Statement": [
   {
      "Sid": "DatabricksAutoIngestSetup",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "s3:PutBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:SetTopicAttributes",
       "sns:CreateTopic",
       "sns:TagResource",
       "sns:Publish",
       "sns:Subscribe",
       "sqs:CreateQueue",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:SetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:s3:::<bucket-name>"
      ]
    }
  ]
}

where:

  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. You can use * if you don’t want to specify the region.

  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. You can use * if do not want to specify the account number.

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running:

    %fs mounts
    

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since we only need to set up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it). See Appendix: Reduced permissions after initial setup for details.

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS and SQS. We assume you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the JSON document:

  • s3:ListBucket
  • s3:GetObject

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

Option Value Default Description
cloudFiles.format The data file format in the source path. None (required option) json, csv, text, parquet, and so on.
cloudFiles.region A cloud provider region. None (required option) The region the queue is defined in.
cloudFiles. includeExistingFiles A boolean value. false Whether to include existing files in the input path in the streaming processing versus only processing new files arrived after setting up the notifications. This option is respected only when you start a stream for the first time. Changing its value at stream restart won’t take any effect.

Frequently asked questions (FAQ)

Do I need to create AWS event notification services beforehand?

No, Auto Loader creates an S3 > SNS topic > SQS queue file event notification pipeline automatically when you start a stream for the first time.

How do I clean up the event notification resources, such as SNS topics and SQS queues, created by Auto Loader?

For now, you must delete these resources manually, either in the Web Console or using APIs. All resources created by Auto Loader will have the prefix: databricks-.

Does Auto Loader process the file again when the file gets appended or overwritten?

No. Files are processed exactly once. If a file gets appended or overwritten, Databricks does not guarantee which version of the file is processed. For well defined behavior, we suggest that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

Can I run multiple streaming queries from the same input directory?

Yes. Each cloud files stream, as identified by a unique checkpoint directory, has its own SQS queue, and the same S3 events can be sent to multiple SQS queues.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. The first run sets up the event notification services, which will be always on, even when the streaming cluster is down. When you restart the stream, the cloudFiles source fetches and processes all files events backed up in the SQS queue. The benefit of using Auto Loader for this case is that you don’t need to determine which files are new and to be processed each time, which can be very expensive.

Can I run multiple streaming queries from different input directories on the same S3 bucket?

Yes, as long as they are not parent-child directories, for example, prod-logs/ and prod-logs/usage/.

Can I use this feature when there are existing file notifications on my S3 bucket?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).

Appendix: Reduced permissions after initial setup

This feature requires the resource setup permissions described in Permissions only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions, which eliminates the following permissions:

  • s3:PutBucketNotification
  • sns:CreateTopic
  • sns:SetTopicAttributes
  • sns:Subscribe
  • sqs:CreateQueue
  • sqs:SetQueueAttributes

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted).

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoIngestSetup",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*",
       "arn:aws:s3:::<bucket-name>"
      ]
    }
  ]
}