Optimized S3 File Source in Structured Streaming using SQS¶
The Databricks S3-SQS connector uses Amazon Simple Queue Service (SQS) to provide an optimized S3 source that lets you find new files written to an S3 bucket without repeatedly listing all of the files. This provides two major advantages:
- Lower latency: no need to list large buckets on S3, which is slow and resource intensive.
- Lower costs: no more costly LIST API requests made to S3.
To use the S3-SQS file source, you must set up event notifications and route them to SQS. See the AWS documentation.
spark.readStream \ .format("s3-sqs") \ .option("fileFormat", "json") \ .option("queueUrl", ...) \ .schema(...) \ .load()
You must provide the
queueUrl option, and a schema to use the S3-SQS source.
The S3-SQS source deletes messages from the SQS queue as events are acknowledged. If you would like to have other pipelines consuming from the same queue, set up a separate SQS queue for the optimized reader. You can use SNS to publish messages to multiple SQS queues.
Trigger.Once() is not supported with the optimized file source.
Authenticate with Amazon SQS and S3¶
For authentication with SQS, we use Amazon’s default credential provider chain. We recommend that you launch your Databricks clusters with an IAM role that can access SQS and your S3 bucket.
|queueUrl||The URL string for the queue.||None (required param)||The URL of the SQS queue.|
|fileFormat||Supported file data sources.||None (required param)||The format of the files such as
|region||Region for the SQS queue.||Locally resolved region||The region the queue is defined in.|
|sqsFetchInterval||A duration string, e.g.
||How long to wait in between fetches if the queue is empty. AWS charges per API request to SQS. Therefore if data isn’t arriving frequently, this value can be set to a long duration. SQS supports long polling with a max duration of 20 seconds. If this value is set longer than 20 seconds, Databricks sleep for the remaining duration. As long as the queue is not empty, Databricks fetches continuously.|
|pathRewrites||A JSON string.||
||If you use mount points, you can rewrite the prefix of the
||If you have lifecycle configurations or you delete the source files manually, you must set this option to
If new files are created every 5 minutes, such as with Kinesis Firehose or CloudTrail logs, you might want to set a high
sqsFetchInterval to reduce SQS costs.
If you observe a lot of messages in the driver logs that look like
Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new, you can either reduce the trigger interval of your stream or increase the visibility timeout in your SQS queue.
- An SQS queue URL already has region endpoint in it, so
regionfield doesn’t need to be set, correct?
- You must explicitly set the region if your SQS queue is not in the same region as your Spark cluster.
sqsFetchInterval, if the value is less than 20 seconds, do we set the SQS long polling timeout to be that specific value?
sqsFetchInterval, if the value is greater than 20 seconds and if there’s data in the queue, do we keep creating SQS long polling requests with 20 second timeouts?
- We will make requests with long polling set to 20 seconds, but SQS will return immediately. You won’t wait 20 seconds.
sqsFetchInterval, if the value is greater than 20 seconds and if the queue is empty, do we create long polling requests with a 20 second timeout after the specified interval?
- We will make requests with long polling set to 20 seconds. If SQS doesn’t return anything, then we will sleep the rest of the interval. We will not make any more requests to SQS for the duration of the interval, since SQS charges per REST API call.
ignoreFileDeletionis False (default) and the object has been deleted, will it fail the whole pipeline?
- Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.