Choosing between file notification and directory listing modes

Auto Loader supports two modes for detecting new files: directory listing and file notification.

  • Directory listing: Auto Loader identifies new files by listing the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configurations other than access to your data on cloud storage. In Databricks Runtime 9.1 and above, Auto Loader can automatically detect whether files are arriving with lexical ordering to your cloud storage and significantly reduce the amount of API calls it needs to make to detect new files. See Incremental Listing for more details.

  • File notification: Auto Loader can automatically set up a notification service and queue service that subscribe to file events from the input directory. File notification mode is more performant and scalable for large input directories or a high volume of files but requires additional cloud permissions for set up. See Leveraging file notifications for more details.

The availability for these modes are listed below.

Cloud Storage

Directory Listing

Incremental Listing

File Notifications

AWS S3

All versions

Databricks Runtime 9.1 and above

All versions

ADLS Gen2

All versions

Databricks Runtime 9.1 and above

All versions

GCS

All versions

Databricks Runtime 9.1 and above

Databricks Runtime 9.1 and above

Azure Blob Storage

All versions

Unsupported

All versions

ADLS Gen1

Databricks Runtime 7.3 and above

Unsupported

Unsupported

DBFS

All versions

For mount points only

For mount points only

As files are discovered, their metadata is persisted in a scalable key-value store (RocksDB) in the checkpoint location of your Auto Loader pipeline. This key-value store ensures that data is processed exactly once. You can switch file discovery modes across stream restarts and still obtain exactly-once data processing guarantees. In fact, this is how Auto Loader can both perform a backfill on a directory containing existing files and concurrently process new files that are being discovered through file notifications.

In case of failures, Auto Loader can resume from where it left off by information stored in the checkpoint location and continue to provide exactly-once guarantees when writing data into Delta Lake. You don’t need to maintain or manage any state yourself to achieve fault tolerance or exactly-once semantics.

Optimized directory listing

Note

Available in Databricks Runtime 9.0 and above.

Auto Loader can discover files on cloud storage systems using directory listing more efficiently than other alternatives. For example, if you had files being uploaded every 5 minutes as /some/path/YYYY/MM/DD/HH/fileName, to find all the files in these directories, the Apache Spark file source would list all subdirectories in parallel, causing 1 (base directory) + 365 (per day) * 24 (per hour) = 8761 LIST API directory calls to storage. By receiving a flattened response from storage, Auto Loader reduces the number of API calls to the number of files in storage divided by the number of results returned by each API call (1000 with S3, 5000 with ADLS Gen2, and 1024 with GCS), greatly reducing your cloud costs.

Incremental Listing

Note

Available in Databricks Runtime 9.1 LTS and above.

Incremental listing is available for Azure Data Lake Storage Gen2 (abfss://), S3 (s3://) and GCS (gs://).

For lexicographically generated files, Auto Loader now can leverage the lexical file ordering and optimized listing APIs to improve the efficiency of directory listing by listing from recently ingested files rather than listing the contents of the entire directory.

By default, Auto Loader will automatically detect whether a given directory is applicable for incremental listing by checking and comparing file paths of previously completed directory listings. To ensure eventual completeness of data in auto mode, Auto Loader will automatically trigger a full directory list after completing 7 consecutive incremental lists. You can control the frequency of full directory lists by setting cloudFiles.backfillInterval to trigger asynchronous backfills at a given interval.

You can explicitly enable or disable incremental listing by setting cloudFiles.useIncrementalListing to "true" or "false" (default "auto"). When explicitly enabled, Auto Loader will not trigger full directory lists unless a backfill interval is set. Services like AWS Kinesis Firehose, AWS DMS, and Azure Data Factory are services that can be configured to upload files to a storage system in lexical order. See the Appendix for more examples of lexical directory structures.

Leveraging file notifications

When files do not arrive with lexical ordering to a bucket, you can use file notifications to scale Auto Loader to ingest millions of files an hour. Auto Loader can set up file notifications for you automatically when you set the option cloudFiles.useNotifications to true and provide the necessary permissions to create cloud resources. In addition, you may need to provide the following additional options to provide Auto Loader authorization to create these resources. The following table summarizes which resources are created by Auto Loader.

Cloud Storage

Subscription Service

Queue Service

Prefix (1)

Limit (2)

AWS S3

AWS SNS

AWS SQS

databricks-auto-ingest

100 per S3 bucket

ADLS Gen2

Azure Event Grid

Azure Queue Storage

databricks

500 per storage account

GCS

Google Pub/Sub

Google Pub/Sub

databricks-auto-ingest

100 per GCS bucket

Azure Blob Storage

Azure Event Grid

Azure Queue Storage

databricks

500 per storage account

  1. Auto Loader will name the resources with this prefix

  2. How many concurrent file notification pipelines can be launched

If you cannot provide Auto Loader with the necessary permissions to create file notification services, you can ask your cloud administrators to use the setUpNotificationServices method in the next section in a Databricks Scala notebook to create file notification services for you. Alternatively, your cloud administrators can set up the file notification services manually, and can provide you with the queue identifier to leverage file notifications. See File notification options for more details.

You can switch between file notifications and directory listing at any time and still maintain exactly once data processing guarantees.

Note

Cloud providers do not guarantee 100% delivery of all file events under very rare conditions and do not provide any strict SLAs on the latency of the file events. Databricks recommends that you trigger regular backfills with Auto Loader by using the cloudFiles.backfillInterval option to guarantee that all files are discovered within a given SLA if data completeness is a requirement. Triggering regular backfills will not cause duplicates.

If you require running more than the limited number of file notification pipelines for a given storage account, you can:

  • Consider rearchitecting how files are uploaded to leverage incremental listing instead of file notifications

  • Leverage a service such as AWS Lambda, Azure Functions, or Google Cloud Functions to fan out notifications from a single queue that listens to an entire container or bucket into directory specific queues

File notification events

AWS S3 provides an ObjectCreated event when a file is uploaded to an S3 bucket regardless of whether it was uploaded by a put or multi-part upload.

ADLS Gen2 provides different event notifications for files appearing in your Gen2 container.

  • Auto Loader listens for the FlushWithClose event for processing a file.

  • Auto Loader streams created with Databricks Runtime 8.3 and after support the RenameFile action for discovering files. RenameFile actions will require an API request to the storage system to get the size of the renamed file.

  • Auto Loader streams created with Databricks Runtime 9.0 and after support the RenameDirectory action for discovering files. RenameDirectory actions will require API requests to the storage system to list the contents of the renamed directory.

Google Cloud Storage provides an OBJECT_FINALIZE event when a file is uploaded, which includes overwrites and file copies. Failed uploads do not generate this event.

Managing file notification resources

You can use Scala APIs to manage the notification and queuing services created by Auto Loader. You must configure the resource setup permissions described in Permissions before using this API.

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) to create a queue and a subscription with the name <prefix>-<resource-suffix> (the prefix depends on the storage system summarized in Leveraging file notifications. If there is an existing resource with the same name, Databricks reuses the resource that already exists instead of creating a new one. This function returns a queue identifier that you can pass to the cloudFiles source using the identifier in File notification options. This enables the cloudFiles source user to have fewer permissions than the user who creates the resources. See Permissions.

Provide the "path" option to newManager only if calling setUpNotificationServices; it is not needed for listNotificationServices or tearDownNotificationServices. This is the same path that you use when running a streaming query.

Cloud Storage

Setup API

List API

Tear down API

AWS S3

All versions

All versions

All versions

ADLS Gen2

All versions

All versions

All versions

GCS

Databricks Runtime 9.1 and above

Databricks Runtime 9.1 and above

Databricks Runtime 9.1 and above

Azure Blob Storage

All versions

All versions

All versions

ADLS Gen1

Unsupported

Unsupported

Unsupported

Lexical ordering of files

For files to be lexically ordered, new files that are uploaded need to have a prefix that is lexicographically greater than existing files. Some examples of lexical ordered directories are shown below.

Versioned files

Delta Lake tables make commits to its transaction log in a lexical order.

<path_to_table>/_delta_log/00000000000000000000.json
<path_to_table>/_delta_log/00000000000000000001.json <- guaranteed to be written after version 0
<path_to_table>/_delta_log/00000000000000000002.json <- guaranteed to be written after version 1
...

AWS DMS uploads CDC files to AWS S3 in a versioned manner.

database_schema_name/table_name/LOAD00000001.csv
database_schema_name/table_name/LOAD00000002.csv
...

Date partitioned files

Files can be uploaded in a date partitioned format and leverage incremental listing. Some examples of this are:

// <base_path>/yyyy/MM/dd/HH:mm:ss-randomString
<base_path>/2021/12/01/10:11:23-b1662ecd-e05e-4bb7-a125-ad81f6e859b4.json
<base_path>/2021/12/01/10:11:23-b9794cf3-3f60-4b8d-ae11-8ea320fad9d1.json
...

// <base_path>/year=yyyy/month=MM/day=dd/hour=HH/minute=mm/randomString
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/442463e5-f6fe-458a-8f69-a06aa970fc69.csv
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/8f00988b-46be-4112-808d-6a35aead0d44.csv <- this may be uploaded before the file above as long as processing happens less frequently than a minute

When files are uploaded with date partitioning, some things to keep in mind are:

  • Months, days, hours, minutes need to be left padded with zeros to ensure lexical ordering (should be uploaded as hour=03, instead of hour=3 or 2021/05/03 instead of 2021/5/3).

  • Files don’t necessarily have to be uploaded in lexical order in the deepest directory as long as processing happens less frequently than the parent directory’s time granularity

Some services that can upload files in a date partitioned lexical ordering are:

Required permissions for setting up file notification resources

ADLS Gen2 and Azure Blob Storage

You must have read permissions for the input directory. See Azure Blob Storage.

To use file notification mode, you must provide authentication credentials for setting up and accessing the event notification services. In Databricks Runtime 8.1 and above, you only need a service principal for authentication. For Databricks Runtime 8.0 and below, you must provide both a service principal and a connection string.

  • Service principal - using Azure built-in roles

    Create an Azure Active Directory app and service principal in the form of client ID and client secret.

    Assign this app the following roles to the storage account in which the input path resides:

    • Contributor: This role is for setting up resources in your storage account, such as queues and event subscriptions.

    • Storage Queue Data Contributor: This role is for performing queue operations such as retrieving and deleting messages from the queues. This role is required in Databricks Runtime 8.1 and above only when you provide a service principal without a connection string.

    Assign this app the following role to the related resource group:

    For more information, see Assign Azure roles using the Azure portal.

  • Service principal - using custom role

    If you are concerned with the execessive permissions required for the preceding roles, you may create a Custom Role with at least the following permissions, listed below in Azure role JSON format:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Then, you may assign this custom role to your app.

    For more information, see Assign Azure roles using the Azure portal.

  • Connection string

    Auto Loader requires a connection stringto authenticate for Azure Queue Storage operations, such as creating a queue and reading and deleting messages from the queue. The queue is created in the same storage account where the input directory path is located. You can find your connection string in your account key or shared access signature (SAS).

    If you are using Databricks Runtime 8.1 or above, you do not need a connection string.

    If you are using Databricks Runtime 8.0 or below, you must provide a connection string to authenticate for Azure Queue Storage operations, such as creating a queue and retrieving and deleting messages from the queue. The queue is created in the same storage account in which the input path resides. You can find your connection string in your account key or shared access signature (SAS). When configuring an SAS token, you must provide the following permissions:

Auto loader permissions

AWS S3

You must have read permissions for the input directory. See S3 connection details for more details.

To use file notification mode, attach the following JSON policy document to your IAM user or role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

where:

  • <bucket-name>: The S3 bucket name where your stream will read files, for example, auto-logs. You can use * as a wildcard, for example, databricks-*-logs. To find out the underlying S3 bucket for your DBFS path, you can list all the DBFS mount points in a notebook by running %fs mounts.

  • <region>: The AWS region where the S3 bucket resides, for example, us-west-2. If you don’t want to specify the region, use *.

  • <account-number>: The AWS account number that owns the S3 bucket, for example, 123456789012. If don’t want to specify the account number, use *.

The string databricks-auto-ingest-* in the SQS and SNS ARN specification is the name prefix that the cloudFiles source uses when creating SQS and SNS services. Since Databricks sets up the notification services in the initial run of the stream, you can use a policy with reduced permissions after the initial run (for example, stop the stream and then restart it).

Note

The preceding policy is concerned only with the permissions needed for setting up file notification services, namely S3 bucket notification, SNS, and SQS services and assumes you already have read access to the S3 bucket. If you need to add S3 read-only permissions, add the following to the Action list in the DatabricksAutoLoaderSetup statement in the JSON document:

  • s3:ListBucket

  • s3:GetObject

Reduced permissions after initial setup

The resource setup permissions described above are required only during the initial run of the stream. After the first run, you can switch to the following IAM policy with reduced permissions.

Important

With the reduced permissions, you won’t able to start new streaming queries or recreate resources in case of failures (for example, the SQS queue has been accidentally deleted); you also won’t be able to use the cloud resource management API to list or tear down resources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Securely ingest data in a different AWS account

Auto Loader can load data across AWS accounts by assuming an IAM role. After setting the temporary security credentials created by AssumeRole, you can have Auto Loader load cloud files cross-accounts. To set up the Auto Loader for cross-AWS accounts, follow the doc: Secure access to S3 buckets across accounts using instance profiles with an AssumeRole policy. Make sure you:

  • Verify that you have the AssumeRole meta role assigned to the cluster.

  • Configure the cluster’s Spark configuration to include the following properties:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

GCS

You must have list and get permissions on your GCS bucket and on all the objects. For details, see the Google documentation on IAM permissions.

To use file notification mode, you need to add permissions for the GCS service account and the account used to access the Google Cloud Pub/Sub resources.

Add the Pub/Sub Publisher role to the GCS service account. This will allow the account to publish event notification messages from your GCS buckets to Google Cloud Pub/Sub.

As for the service account used for the Google Cloud Pub/Sub resources, you will need to add the following permissions:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

To do this, you can either create an IAM custom role with these permissions or assign pre-existing GCP roles to cover these permissions.

Finding the GCS Service Account

In the Google Cloud Console for the corresponding project, navigate to Cloud Storage > Settings. On that page, you should see a section titled “Cloud Storage Service Account” containing the email of the GCS service account.

GCS Service Account

Creating a Custom Google Cloud IAM Role for File Notification Mode

In the Google Cloud console for the corresponding project, navigate to IAM & Admin > Roles. Then, either create a role at the top or update an existing role. In the screen for role creation or edit, click Add Permissions. A menu should then pop up in which you can add the desired permissions to the role.

GCP IAM Custom Roles

Troubleshooting

Error:

java.lang.RuntimeException: Failed to create event grid subscription.

If you see this error message when you run Auto Loader for the first time, the Event Grid is not registered as a Resource Provider in your Azure subscription. To register this on Azure portal:

  1. Go to your subscription.

  2. Click Resource Providers under the Settings section.

  3. Register the provider Microsoft.EventGrid.

Error:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

If you see this error message when you run Auto Loader for the first time, ensure you have given the Contributor role to your service principal for Event Grid as well as your storage account.