Load files from AWS S3 using Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in AWS S3 (s3://).

Auto Loader provides a Structured Streaming source called cloudFiles. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.

Auto Loader works with DBFS paths as well as direct paths to the data source.

Requirements

Databricks Runtime 7.2 or above.

If you created streams using Databricks Runtime 7.1 or below, see Changes in default option values and compatibility and Cloud resource management.

File discovery modes

Auto Loader supports two modes for detecting when there are new files: directory listing and file notification.

  • Directory listing: Identifies new files by listing the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configurations other than access to your data on AWS S3 and is suitable for scenarios where only a few files need to be streamed in on a regular basis. Directory listing mode is the default for Auto Loader in Databricks Runtime 7.2 and above.
  • File notification: Uses AWS SNS and SQS services that subscribe to file events from the input directory. Auto Loader automatically sets up the AWS SNS and SQS services. File notification mode is more performant and scalable for large input directories. To use this mode, you must configure permissions for the AWS SNS and SQS services and specify .option("cloudFiles.useNotifications","true").

You can change mode when you restart the stream. For example, you may want to switch to file notification mode when the directory listing is getting too slow due to the increase in input directory size. For both modes, Auto Loader internally keeps tracks of what files have been processed in your streaming checkpoint location to provide exactly-once semantics, so you do not need to manage any state information yourself.

Use cloudFiles source

To use the Auto Loader, create a cloudFiles source in the same way as other streaming sources. The code below will start an Auto Loader stream writing to Delta Lake in directory listing mode:

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)
val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

where:

  • <cloudFiles-option> is a configuration option in Configuration.
  • <schema> is the file schema. Auto Loader also supports schema inference and evolution with some file formats. See Schema inference and evolution for more details
  • <input-path> is the path in storage that is monitored for new files. Child directories of <input-path> are also monitored. <input-path> can contain file glob patterns. The glob pattern will have * appended to it; if this includes files you don’t want to ingest you can include an additional filter through the pathGlobFilter option. If you are providing a queue for file notifications and don’t need to backfill any data, you don’t need to provide an input path.
  • <checkpoint-path> is the stream checkpoint location.
  • <trigger> An optional trigger for the stream. The default is to execute the next micro-batch as quickly as possible. If you have data arriving at a regular interval, for example once a day, you can use Trigger Once and schedule the execution of your streams in a Databricks job. For always on streams, Databricks recommends that you set a processing time trigger.
  • <output-path> is the output stream path.

Benefits over Apache Spark FileStreamSource

In Apache Spark, you can read files incrementally using spark.readStream.format(fileFormat).load(directory). Auto Loader provides the following benefits over the file source:

  • Scalability: Auto Loader can discover billions of files efficiently. Backfills can be performed asynchronously to avoid wasting any compute resources.
  • Performance: The cost of discovering files with Auto Loader scales with the number of files that are being ingested instead of the number of directories that the files may land in. See Optimized directory listing.
  • Schema inference and evolution support: Auto Loader can detect schema drifts, notify you when schema changes happen, and rescue data that would have been otherwise ignored or lost. See Schema inference and evolution.
  • Cost: Auto Loader uses native cloud APIs to get lists of files that exist in storage. In addition, Auto Loader’s file notification mode can help reduce your cloud costs further by avoiding directory listing altogether. Auto Loader can automatically set up file notification services on storage to make file discovery much cheaper.

Optimized directory listing

Note

Available in Databricks Runtime 9.0 and above.

Auto Loader can discover files on cloud storage systems using directory listing more efficiently than other alternatives. For example, if you had files being uploaded every 5 minutes as /some/path/YYYY/MM/DD/HH/fileName, to find all the files in these directories, the Apache Spark file source would list all subdirectories in parallel, causing 1 (base directory) + 365 (per day) * 24 (per hour) = 8761 LIST API directory calls to storage. By receiving a flattened response from storage, Auto Loader reduces the number of API calls to the number of files in storage divided by the number of results returned by each API call, greatly reducing your cloud costs.

Incremental Listing

Preview

This feature is in Public Preview.

For lexicographically generated files, Auto Loader now can leverage the lexical file ordering and existing optimized APIs to improve the efficiency of directory listing by listing from previous ingested files rather than listing the entire directory.

By default, Auto Loader will automatically detect whether a given directory is applicable for the incremental listing by checking and comparing file paths of previous completed full directory listings. To ensure eventual completeness in this auto mode, Auto Loader will automatically trigger the full directory listing after completing 7 consecutive incremental listings. If you want to be more frequent or less frequent, you can set cloudFiles.backfillInterval to trigger asynchronous backfills at a given interval.

If you have confidence in the order of files generated in the directory, you can explicitly turn on or off the incremental listing mode by setting cloudFiles.useIncrementalListing to true or false (default auto), e.g., files that are ordered by date=... partitions can be considered lexically ordered if data is processed once a day, file paths containing timestamps can be considered lexically ordered. You can always use cloudFiles.backfillInterval to ensure that all data is ingested when you turn on the incremental listing.

Schema inference and evolution

Note

Available in Databricks Runtime 8.2 and above.

Auto Loader supports schema inference and evolution with CSV, JSON, binary (binaryFile), and text file formats. See Schema inference and evolution in Auto Loader for details.

Run Auto Loader in production

Databricks recommends that you follow the streaming best practices for running Auto Loader in production.

Configuration

Configuration options specific to the cloudFiles source are prefixed with cloudFiles so that they are in a separate namespace from other Structured Streaming source options.

Important

Some default option values changed in Databricks Runtime 7.2. If you are using Auto Loader on Databricks Runtime 7.1 or below, see Changes in default option values and compatibility.

File format options

With Auto Loader you can ingest JSON, CSV, PARQUET, AVRO, TEXT, BINARYFILE, and ORC files. See Format options for the options for these file formats.

Common Auto Loader options

You can configure the following options for directory listing or file notification mode.

Option

cloudFiles.allowOverwrites

Type: Boolean

Whether to allow input directory file changes to overwrite existing data. Available in Databricks Runtime 7.6 and above.

Default value: true

cloudFiles.format

Type: String

The data file format in the source path. Allowed values include:

Default value: None (required option)

cloudFiles.includeExistingFiles

Type: Boolean

Whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. This option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect.

Default value: true

cloudFiles.inferColumnTypes

Type: Boolean

Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON datasets. See schema inference for more details.

Default value: false

cloudFiles.maxBytesPerTrigger

Type: Byte String

The maximum number of new bytes to be processed in every trigger. You can specify a byte string such as 10g to limit each microbatch to 10 GB of data. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a microbatch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once().

Default value: None

cloudFiles.maxFileAge

Type: Interval String

How long a file event is tracked for deduplication purposes. Databricks does not recommend tuning this parameter unless you are ingesting data at the order of millions of files an hour. See the section on How to choose maxFileAge for more details.

Default value: None

cloudFiles.resourceTags

Type: Map(String, String)

A series of key-value tag pairs to help associate and identify related resources, for example:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")           .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

For more information, see Amazon SQS cost allocation tags and Configuring tags for an Amazon SNS topic. (1)

Default value: None

cloudFiles.schemaEvolutionMode

Type: String

The mode for evolving the schema as new columns are discovered in the data. By default, columns are inferred as strings when inferring JSON datasets. See schema evolution for more details.

Default value: "addNewColumns" when a schema is not provided. "none" otherwise.

cloudFiles.schemaHints

Type: String

Schema information that you provide to Auto Loader during schema inference. See schema hints for more details.

Default value: None

cloudFiles.schemaLocation

Type: String

The location to store the inferred schema and subsequent changes. See schema inference for more details.

Default value: None (required when inferring the schema)

cloudFiles.validateOptions

Type: Boolean

Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Default value: true

cloudFiles.backfillInterval

Preview

This feature is in Public Preview.

Type: Interval String

Auto Loader can trigger asynchronous backfills at a given interval, e.g. 1 day to backfill once a day, or 1 week to backfill once a week. File event notification systems do not guarantee 100% delivery of all files that have been uploaded therefore you can use backfills to guarantee that all files eventually get processed, available in Databricks Runtime 8.4 and Databricks Runtime 8.4 Photon and above. If using the incremental listing, you can also use regular backfills to guarantee the eventual completeness, available in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

Default value: None

(1) Auto Loader adds the following key-value tag pairs by default on a best-effort basis:

  • vendor: Databricks
  • path: The location from where the data is loaded.
  • checkpointLocation: The location of the stream’s checkpoint.
  • streamId: A globally unique identifier for the stream.

These key names are reserved and you cannot overwrite their values.

Directory Listing options

The following options are relevant to directory listing mode.

Option

cloudFiles.useIncrementalListing

Preview

This feature is in Public Preview.

Type: String

Whether to use the incremental listing rather than the full listing in directory listing mode. By default, Auto Loader will make the best effort to automatically detect if a given directory is applicable for the incremental listing. You can explicitly use the incremental listing or use the full directory listing by setting it as true or false respectively.

Available in Databricks Runtime 9.1 LTS and Databricks Runtime 9.1 LTS Photon and above.

Default value: auto

Available values: auto, true, false

How to choose maxFileAge

Note

Available in Databricks Runtime 8.4 and above.

Auto Loader keeps track of discovered files in the checkpoint location using RocksDB to provide exactly-once ingestion guarantees. For high volume datasets, you can use the maxFileAge option to expire events from the checkpoint location. The minimum value that you can set for maxFileAge is "14 days". Deletes in RocksDB appear as tombstone entries, therefore you should expect the storage usage to increase as events expire before it starts to level off.

Warning

maxFileAge is provided as a cost control mechanism for high volume datasets, ingesting in the order of millions of files every hour. Tuning maxFileAge incorrectly can lead to data quality issues. Therefore, Databricks doesn’t recommend tuning this parameter unless absolutely required.

Trying to tune the maxFileAge option can lead to unprocessed files being ignored by Auto Loader or already processed files expiring and then being re-processed causing duplicate data. Here are some things to consider when choosing a maxFileAge:

  • If your stream restarts after a long time, file notification events that are pulled from the queue that are older than maxFileAge are ignored. Similarly, if you use directory listing, files that may have appeared during the down time that are older than maxFileAge are ignored.
  • If you use directory listing mode and use maxFileAge, for example set to "1 month", you stop your stream and restart the stream with maxFileAge set to "2 months", all files that are older than 1 month, but more recent than 2 months are reprocessed.

The best approach to tuning maxFileAge would be to start from a generous expiration, for example, "1 year" and working downwards to something like "9 months". If you set this option the first time you start the stream, you will not ingest data older than maxFileAge, therefore, if you want to ingest old data you should not set this option as you start your stream.

File notification options

The following options are relevant to file notification mode.

Option

cloudFiles.fetchParallelism

Type: Integer

Number of threads to use when fetching messages from the queueing service.

Default value: 1

cloudFiles.pathRewrites

Type: A JSON string

Required only if you specify a queueUrl that receives file notifications from multiple S3 buckets and you want to leverage mount points configured for accessing data in these containers. Use this option to rewrite the prefix of the bucket/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, the path s3://<databricks-mounted-bucket>/path/2017/08/fileA.json is rewritten to dbfs:/mnt/data-warehouse/2017/08/fileA.json.

Default value: None

cloudFiles.queueUrl

Type: String

The URL of the SQS queue. If provided, the cloud files source directly consumes events from this queue instead of setting up its own AWS SNS and SQS services.

Default value: None

cloudFiles.useNotifications

Type: Boolean

Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See File discovery modes.

Default value: false

Provide the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:

Option

cloudFiles.region

Type: String

The region where the source S3 bucket resides and where the AWS SNS and SQS services will be created.

Default value: In Databricks Runtime 9.0 and above the region of the EC2 instance. In Databricks Runtime 8.4 and below you must specify the region.

You can use the following options to provide credentials to access AWS SNS and SQS when IAM roles are not available or when you’re ingesting data from different clouds.

Option

cloudFiles.awsAccessKey

Type: String

The AWS access key ID for the user. Must be provided with cloudFiles.awsSecretKey.

Default value: None

cloudFiles.awsSecretKey

Type: String

The AWS secret access key for the user. Must be provided with cloudFiles.awsAccessKey.

Default value: None

cloudFiles.roleArn

Type: String

The ARN of an IAM role to assume. The role can be assumed from your cluster’s instance profile or by providing credentials with cloudFiles.awsAccessKey and cloudFiles.awsSecretKey.

Default value: None

cloudFiles.roleExternalId

Type: String

An identifier to provide while assuming a role using cloudFiles.roleArn.

Default value: None

cloudFiles.roleSessionName

Type: String

An optional session name to use while assuming a role using cloudFiles.roleArn.

Default value: None

cloudFiles.stsEndpoint

Type: String

An optional endpoint to provide for accessing AWS STS when assuming a role using cloudFiles.roleArn.

Default value: None

Changes in default option values and compatibility

The default values of the following Auto Loader options changed in Databricks Runtime 7.2 to the values listed in Configuration.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

Auto Loader streams started on Databricks Runtime 7.1 and below have the following default option values:

  • cloudFiles.useNotifications is true
  • cloudFiles.includeExistingFiles is false
  • cloudFiles.validateOptions is false

To ensure compatibility with existing applications, these default option values do not change when you run your existing Auto Loader streams on Databricks Runtime 7.2 or above; the streams will have the same behavior after the upgrade.

Permissions

You must have read permissions for the input directory. See S3 connection details for more details.

To use file notification mode, attach the following JSON policy document to your IAM user or role.

If you are unable to set up the permissions specified in the JSON policy document, you can optionally ask an administrator to perform setup for you using the Cloud resource management Scala API. An administrator can provide you with the queue URL, which you can directly provide as .option("queueUrl", <queue-url>) to the cloudFiles source. With this configuration, you need only reduced permissions. See Appendix: Reduced permissions after initial setup for details.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

where:

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running %fs mounts.
  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. If you don’t want to specify the region, use *.
  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. If don’t want to specify the account number, use *.

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since Databricks sets up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it). See Appendix: Reduced permissions after initial setup for details.

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS, and SQS services and assumes you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the DatabricksAutoLoaderSetup statement in the JSON document:

  • s3:ListBucket
  • s3:GetObject

Securely ingest data in a different AWS account

Auto Loader can load data across AWS accounts by assuming an IAM role. After setting the temporary security credentials created by AssumeRole, you can have Auto Loader load cloud files cross-accounts. To set up the Auto Loader for cross-AWS accounts, follow the doc: Secure access to S3 buckets across accounts using instance profiles with an AssumeRole policy. Make sure you:

  • Verify that you have the AssumeRole meta role assigned to the cluster.

  • Configure the cluster’s Spark configuration to include the following properties:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

Metrics

Auto Loader reports metrics at every batch. You can view how many files exist in the backlog and how large the backlog is in the numFilesOutstanding and numBytesOutstanding metrics under the Raw Data tab in the streaming query progress dashboard:

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Cloud resource management

You can use a Scala API to manage the AWS SNS and SQS services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

Important

If you have used Auto Loader in Databricks Runtime 7.1 and below, update your IAM policy using the JSON policy document in Permissions. There are new statements in the policy for Databricks Runtime 7.2—DatabricksAutoLoaderList and DatabricksAutoLoaderTeardown—that specify the additional permissions required by the Scala API.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Note

Available in Databricks Runtime 7.4 and above.

Use setUpNotificationServices(<resource-suffix>) to create an SQS queue and an SNS topic with the name databricks-auto-ingest-<resource-suffix>. If there is an existing SQS queue or SNS topic with the same name, Databricks reuses the resource that already exists instead of creating a new one. This function returns an SQS queue that you can pass to the cloudFiles source using .option("cloudFiles.queueUrl", <queue-url>). This enables the cloudFiles source user to have fewer permissions than the user who creates the resources. See Permissions.

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Frequently asked questions (FAQ)

Do I need to create AWS event notification services beforehand?

No. If you choose file notification mode, Auto Loader creates an AWS S3 > SNS Topic > SQS Queue file event notification pipeline automatically when you start the stream.

How do I clean up the event notification resources, such as SNS topics and SQS queues, created by Auto Loader?

You can use the cloud resource manager to list and tear down resources. You can also delete these resources manually, either in the Web Console or using AWS APIs. All resources created by Auto Loader have the prefix: databricks-auto-ingest-.

Does Auto Loader process the file again when the file gets appended or overwritten?

Files are processed exactly once unless you enable cloudFiles.allowOverwrites. If a file is appended to or overwritten, Databricks does not guarantee which version of the file is processed. For well-defined behavior, Databricks recommends that you use Auto Loader to ingest only immutable files. If this does not meet your requirements, contact your Databricks representative.

Can I run multiple streaming queries from the same input directory?

Yes. Each cloud files stream, as identified by a unique checkpoint directory, has its own SQS queue, and the same AWS S3 events can be sent to multiple SQS queues.

If my data files do not arrive continuously, but in regular intervals, for example, once a day, should I still use this source and are there any benefits?

Yes and yes. In this case, you can set up a Trigger-Once Structured Streaming job and schedule to run after the anticipated file arrival time. The first run sets up the event notification services, which will be always on, even when the streaming cluster is down. When you restart the stream, the cloudFiles source fetches and processes all files events backed up in the SQS queue. The benefit of using Auto Loader for this case is that you don’t need to determine which files are new and to be processed each time, which can be very expensive.

What happens if I change the checkpoint location when restarting the stream?

A checkpoint location maintains important identifying information of a stream. Changing the checkpoint location effectively means that you have abandoned the previous stream and started a new stream. The new stream will create new progress information and if you are using file notification mode, new AWS SNS and SQS services. You must manually clean up the checkpoint location and AWS SNS and SQS services for any abandoned streams.

Can I run multiple streaming queries from different input directories on the same S3 bucket?

Yes, as long as they are not parent-child directories, for example, prod-logs/ and prod-logs/usage/.

Can I use this feature when there are existing file notifications on my S3 bucket?

Yes, as long as your input directory does not conflict with the existing notification prefix (for example, the above parent-child directories).

Appendix: Reduced permissions after initial setup

The resource setup permissions described in Permissions are required only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions.

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted); you also won’t be able to use the cloud resource management API to list or tear down resources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}