The Databricks S3-SQS connector uses Amazon Simple Queue Service (SQS) to provide an optimized Amazon S3 source that lets you find new files written to an S3 bucket without repeatedly listing all of the files. This provides two major advantages:
- Lower latency: no need to list large buckets on S3, which is slow and resource intensive.
- Lower costs: no more costly LIST API requests made to S3.
The S3-SQS source deletes messages from the SQS queue as events are acknowledged. If you would like to have other pipelines consuming from the same queue, set up a separate SQS queue for the optimized reader. You can use SNS to publish messages to multiple SQS queues.
To use the S3-SQS file source you must:
Set up event notifications and route them to SQS. See Configuring Amazon S3 Event Notifications.
queueUrloptions and a schema. For example:
spark.readStream \ .format("s3-sqs") \ .option("fileFormat", "json") \ .option("queueUrl", ...) \ .schema(...) \ .load()
Trigger.Once() is supported with this source since Databricks Runtime 4.2.
Databricks uses Amazon’s default credential provider chain for authentication to SQS. We recommend that you launch your Databricks clusters with an instance profile that can access SQS and your S3 bucket.
This source requires
s3:GetObject permissions. If you experience
Amazon: Access Denied exceptions,
check that your user or profile has these permissions. See Using Identity-Based (IAM) Policies for Amazon SQS and
Bucket Policy Examples for details.
|queueUrl||The URL string for the queue.||None (required param)||The URL of the SQS queue.|
|fileFormat||Supported file data sources.||None (required param)||The format of the files such as
|region||Region for the SQS queue.||Locally resolved region||The region the queue is defined in.|
|sqsFetchInterval||A duration string, for example,
How long to wait in between fetches if the queue is empty. AWS charges per API request to SQS. Therefore if data isn’t arriving frequently, this value can be set to a long duration. SQS supports long polling with a max duration of 20 seconds. If this value is set longer than 20 seconds, we sleep for the remaining duration. As long as the queue is not empty, we fetch continuously.
If new files are created every 5 minutes, such as with Kinesis Firehose or CloudTrail
logs, to reduce SQS costs you might want to set a high
|pathRewrites||A JSON string.||
||If you use mount points, you can rewrite the prefix of the
|maxFileAge||Integer||25200||Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing.|
||Whether a blob that gets overwritten should be reprocessed.|
|maxFilesPerTrigger||Integer||1000||Maximum number of new files to be considered in every trigger.|
|excludeRegex||String||None||Exclude the files based on path.|
If you observe a lot of messages in the driver logs that look like
Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new events, you can either reduce the trigger interval of your stream or increase the visibility timeout in your SQS queue.
If you are consuming files from a location on S3 where you expect that some files may be deleted before they can be processed, you can set the following configuration to ignore the error and continue processing:
An SQS queue URL already has region endpoint in it, so
region field doesn’t need to be set, correct?
You must explicitly set the region if your SQS queue is not in the same region as your Spark cluster.
- If the value is less than 20 seconds, do we set the SQS long polling timeout to be that specific value? Yes.
- If the value is greater than 20 seconds and if there’s data in the queue, do we keep creating SQS long polling requests with 20 second timeouts? We will make requests with long polling set to 20 seconds, but SQS will return immediately. You won’t wait 20 seconds.
- If the value is greater than 20 seconds and if the queue is empty, do we create long polling requests with a 20 second timeout after the specified interval? We will make requests with long polling set to 20 seconds. If SQS doesn’t return anything, we will sleep the rest of the interval. We will not make any more requests to SQS for the duration of the interval, since SQS charges per REST API call.
ignoreFileDeletion is False (default) and the object has been deleted, will it fail the whole pipeline?
Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.
How should I set
SQS provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. The default setting for
maxFileAge is 7 days, which is greater than the default TTL of a message in SQS, which is 4 days. If you set the retention duration of a message in your queue to be higher, set this configuration accordingly.