Amazon S3 source with Amazon SQS (legacy)

Important

This documentation has been retired and might not be updated. The products, services, or technologies mentioned in this content are no longer supported. See What is Auto Loader?.

The S3-SQS connector uses Amazon Simple Queue Service (SQS) to provide an optimized Amazon S3 source that lets you find new files written to an S3 bucket without repeatedly listing all of the files. This provides two advantages:

  • Lower latency: no need to list large buckets on S3, which is slow and resource intensive.

  • Lower costs: no more costly LIST API requests made to S3.

Note

The S3-SQS source deletes messages from the SQS queue as it consumes events. If you want to have other pipelines consuming from the same queue, set up a separate SQS queue for the optimized reader. You can use SNS to publish messages to multiple SQS queues.

Use the S3-SQS file source

To use the S3-SQS file source you must:

  • Set up event notifications and route them to SQS. See Configuring Amazon S3 Event Notifications.

  • Specify the fileFormat and queueUrl options and a schema. For example:

    spark.readStream \
      .format("s3-sqs") \
      .option("fileFormat", "json") \
      .option("queueUrl", ...) \
      .schema(...) \
      .load()
    

Authenticate with Amazon SQS and S3

Databricks uses Amazon’s default credential provider chain for authentication to SQS. We recommend that you launch your Databricks clusters with an instance profile that can access SQS and your S3 bucket.

This source requires sqs:ReceiveMessage, sqs:DeleteMessage, and s3:GetObject permissions. If you experience Amazon: Access Denied exceptions, check that your user or profile has these permissions. See Using Identity-Based (IAM) Policies for Amazon SQS and Bucket Policy Examples for details.

Configuration

Option

Type

Default

Description

allowOverwrites

Boolean

true

Whether a blob that gets overwritten should be reprocessed.

excludeRegex

String

None

Exclude the files based on path.

fetchParallelism

Integer

1

Number of threads to use when fetching messages from the queueing service.

fileFormat

String

None (required param)

The format of the files such as parquet, json, csv, text, and so on.

maxFileAge

Integer

604800

Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing.

maxFilesPerTrigger

Integer

1000

Maximum number of new files to be considered in every trigger.

pathRewrites

A JSON string.

"{}"

If you use mount points, you can rewrite the prefix of the bucket/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"<databricks-mounted-bucket>/path": "/mnt/data-warehouse"}, the path <databricks-mounted-bucket>/path/2017/08/fileA.json is rewritten to /mnt/data-warehouse/2017/08/fileA.json.

queueUrl

String

None (required param)

The URL of the SQS queue.

region

String

Locally resolved region

The region the queue is defined in.

sqsFetchInterval

A duration string, for example, 2m for 2 minutes.

"5s"

How long to wait in between fetches if the queue is empty. AWS charges per API request to SQS. Therefore if data isn’t arriving frequently, this value can be set to a long duration. SQS supports long polling with a max duration of 20 seconds. If this value is set longer than 20 seconds, we sleep for the remaining duration. As long as the queue is not empty, we fetch continuously.

If new files are created every 5 minutes, such as with Kinesis Firehose or CloudTrail logs, to reduce SQS costs you might want to set a high sqsFetchInterval.

If you observe a lot of messages in the driver logs that look like Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new events, you can either reduce the trigger interval of your stream or increase the visibility timeout in your SQS queue.

If you are consuming files from a location on S3 where you expect that some files may be deleted before they can be processed, you can set the following configuration to ignore the error and continue processing:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

Frequently asked questions (FAQ)

An SQS queue URL already has region endpoint in it, so region field doesn’t need to be set, correct?

You must explicitly set the region if your SQS queue is not in the same region as your Spark cluster.

sqsFetchInterval

  • If the value is less than 20 seconds, do we set the SQS long polling timeout to be that specific value? Yes.

  • If the value is greater than 20 seconds and if there’s data in the queue, do we keep creating SQS long polling requests with 20 second timeouts? We will make requests with long polling set to 20 seconds, but SQS will return immediately. You won’t wait 20 seconds.

  • If the value is greater than 20 seconds and if the queue is empty, do we create long polling requests with a 20 second timeout after the specified interval? We will make requests with long polling set to 20 seconds. If SQS doesn’t return anything, we will sleep the rest of the interval. We will not make any more requests to SQS for the duration of the interval, since SQS charges per REST API call.

If ignoreFileDeletion is False (default) and the object has been deleted, will it fail the whole pipeline?

Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.

How should I set maxFileAge?

SQS provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. The default setting for maxFileAge is 7 days, which is greater than the default TTL of a message in SQS, which is 4 days. If you set the retention duration of a message in your queue to be higher, set this configuration accordingly.