Optimized S3 File Source in Structured Streaming using SQS

The Databricks S3-SQS connector uses Amazon Simple Queue Service (SQS) to provide an optimized S3 source that lets you find new files written to an S3 bucket without repeatedly listing all of the files. This provides two major advantages:

  1. Lower latency: no need to list large buckets on S3, which is slow and resource intensive.
  2. Lower costs: no more costly LIST API requests made to S3.

To use the S3-SQS file source, you must set up event notifications and route them to SQS. See the AWS documentation.

spark.readStream \
  .format("s3-sqs") \
  .option("fileFormat", "json") \
  .option("queueUrl", ...) \
  .schema(...) \
  .load()

You must provide the fileFormat option, queueUrl option, and a schema to use the S3-SQS source.

Warning

The S3-SQS source will delete messages from the SQS queue as events are acknowledged. If you would like to have other pipelines consuming from the same queue, please set up a separate SQS queue for the optimized reader. You can use SNS to publish messages to multiple SQS queues.

Note

Trigger.Once() is not supported with the optimized file source.

Authenticating with Amazon SQS and S3

For authentication with SQS, we use Amazon’s default credential provider chain by default. We recommend that you launch your Databricks clusters with an IAM Role that can access SQS and your S3 bucket.

Note

The S3 Source requires sqs:DeleteMessageBatch, sqs:ReceiveMessage, and s3:GetObject permissions. If you hit Amazon: Access Denied exceptions, double check that your user or profile has these permissions. See the S3 and SQS documentation for more details.

Configuration

Option Value Default Description
queueUrl The URL string for the queue None (required param) The URL of the SQS queue.
fileFormat Supported file data sources None (required param) The format of the files such as parquet, json, csv, text, ...
region Region for the SQS queue Locally resolved region The region the queue is defined in.
sqsFetchInterval A duration string, e.g. 2m for 2 minutes "5s" How long to wait in between fetches if the queue is empty. [1]
pathRewrites A json string "{}" If you use mount points, you may rewrite the prefix of the bucket/key path with the mount point. [2]
ignoreFileDeletion boolean false If you have lifecycle configurations or you delete the source files manually, you must set this option to true.
[1]AWS charges per API request to SQS. Therefore if data isn’t arriving frequently, this value can be set to a high duration. SQS supports long polling with a max duration of 20 seconds. If this value is set higher than 20 seconds, we will sleep for the remaining duration. As long as the queue is not empty, we will fetch continuously.
[2]Only prefixes can be rewritten. For example: for a configuration of {"databricks-mounted-bucket/path": "/mnt/data-warehouse"}, the path databricks-mounted-bucket/path/2017/08/fileA.json will be rewritten to /mnt/data-warehouse/2017/08/fileA.json

If new files are created every 5 minutes, such as with Kinesis Firehose or CloudTrail logs, you might want to set a high sqsFetchInterval to reduce SQS costs.

If you observe a lot of messages in the driver logs that look like Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new, you can either reduce the trigger interval of your stream or increase the visibility timeout in your SQS queue.

FAQ

An SQS queue URL already has region endpoint in it, so “region” field doesn’t need to be set, correct?

You must explicitly set the region if your SQS queue is not in the same region as your Spark cluster.

For sqsFetchInterval , if the value is less than 20 seconds, do we set the SQS long polling timeout to be that specific value?

Yes

For sqsFetchInterval , if the value is greater than 20 seconds and if there’s data in the queue, do we keep creating SQS long polling requests with 20 second timeouts?

We will make requests with long polling set to 20 seconds, but SQS will return immediately. You won’t wait 20 seconds.

For sqsFetchInterval , if the value is greater than 20 seconds and if the queue is empty, do we create long polling requests with a 20 second timeout after the specified interval?

We will make requests with long polling set to 20 seconds. If SQS doesn’t return anything, then we will sleep the rest of the interval. We will not make any more requests to SQS for the duration of the interval, since SQS charges per REST API call.

If ignoreFileDeletion is False (default) and the object has been deleted, will it fail the whole pipeline?

Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.