The Databricks ABS-AQS connector uses Azure Queue Storage (AQS) to provide an optimized file source that lets you find new files written to an Azure Blob Storage (ABS) container without repeatedly listing all of the files. This provides two major advantages:
- Lower latency: no need to list nested directory structures on ABS, which is slow and resource intensive.
- Lower costs: no more costly LIST API requests made to ABS.
The ABS-AQS source deletes messages from the AQS queue as events are acknowledged. If you would like to have other pipelines consuming from the same queue, set up a separate AQS queue for the optimized reader. You can set up multiple Event Grid Subscriptions to publish to different queues.
In this topic:
To use the ABS-AQS file source you must:
Set up ABS event notifications by leveraging Azure Event Grid Subscriptions and route them to AQS. See Reacting to Blob storage events.
queueUrloptions and a schema. For example:
spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
In order to authenticate with Azure Queue Storage and Blob Storage, you will need to use Shared Access Signature (SAS) tokens or storage account keys. You will need to provide a connection string for the storage account where your queue is deployed which will contain either your SAS token or access keys to your storage account. Refer to Configure Azure Storage connection strings for more information.
You will also need to provide access to your Azure Blob Storage containers. Refer to Azure Blob Storage for information on how to configure access to your Azure Blob Storage container.
We strongly recommend that you use Secrets for providing your connection strings.
|queueName||The name of the queue.||None (required param)||The name of the AQS queue.|
|fileFormat||Supported file data sources.||None (required param)||The format of the files such as
|connectionString||The connection string to access your Azure Queue Storage account.||None (required param)||The connection string to access your queue.|
|queueFetchInterval||A duration string, for example,
||How long to wait in between fetches if the queue is empty. Azure charges per API request to AQS. Therefore if data
isn’t arriving frequently, this value can be set to a long duration. As long as the queue is not empty, we will fetch continuously.
If new files are created every 5 minutes, you might want to set a high
|pathRewrites||A JSON string.||
||If you use mount points, you can rewrite the prefix of the
||If you have lifecycle configurations or you delete the source files manually, you must set this option to
|maxFileAge||Integer||25200||Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing.|
||Whether a blob that gets overwritten should be reprocessed.|
If you observe a lot of messages in the driver logs that look like
Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new, you should reduce the trigger interval of your stream.
ignoreFileDeletionis False (default) and the object has been deleted, will it fail the whole pipeline?
- Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.
- How should I set
- Azure Queue Storage provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. The default setting for
maxFileAgeis 7 days, which is equal to the maximum TTL of a message in the queue.