Auto Loaderファイル通知モードとは何ですか?

ファイル通知モードでは、 Auto Loader は、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、1 時間に数百万のファイルを取り込むようにスケール Auto Loader できます。 ディレクトリリストモードと比較すると、ファイル通知モードは、大規模な入力ディレクトリや大量のファイルに対してパフォーマンスと拡張性に優れていますが、追加のクラウド権限が必要です。

ファイル通知とディレクトリリストをいつでも切り替えることができ、正確なデータ処理の保証を維持します。

警告

Auto Loaderのソース パスの変更は、ファイル通知モードではサポートされていません。ファイル通知モードが使用されている場合、パスが変更されると、ディレクトリの更新時に新しいディレクトリに既に存在するファイルの取り込みに失敗する可能性があります。

ファイル通知モードは、シングルユーザーコンピュートでのみサポートされています。

Auto Loader ファイル通知モードで使用されるクラウドリソース

重要

ファイル通知モードのクラウド インフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 見る:

Auto Loader は、オプション cloudFiles.useNotificationstrue に設定し、クラウド リソースを作成するために必要なアクセス許可を提供すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限を に付与するための 追加オプション を提供する必要がある場合があります。Auto Loader

次の表は、 Auto Loaderによって作成されるリソースをまとめたものです。

クラウドストレージ

サブスクリプションサービス

キューサービス

接頭辞*

制限**

AWS S3の

AWS SNSの

AWS SQSの

Databricks の自動取り込み

S3バケットあたり100個

ADLS Gen2

Azure イベント グリッド

Azure キュー ストレージ

Databricks

ストレージ アカウントあたり 500

GCSの

Google Pub/Sub

Google Pub/Sub

Databricks の自動取り込み

GCSバケットあたり100個

Azure Blobストレージ

Azure イベント グリッド

Azure キュー ストレージ

Databricks

ストレージ アカウントあたり 500

* Auto Loader は、このプレフィックスを使用してリソースに名前を付けます。

** 起動できる並列ファイル通知パイプラインの数

特定のストレージ アカウントに対して制限された数を超えるファイル通知パイプラインを実行する必要がある場合は、次のことができます。

  • AWS Lambda、Azure Functions、Google Cloud Functions などのサービスを活用して、コンテナまたはバケット全体をリッスンする 1 つのキューからディレクトリ固有のキューに通知を分散させます。

ファイル通知イベント

AWS S3 は、ファイルが S3 バケットにアップロードされると、プットアップロードとマルチパートアップロードのどちらによってアップロードされたかに関係なく、 ObjectCreated イベントを提供します。

ADLS Gen2 では、Gen2 コンテナーに表示されるファイルに対してさまざまなイベント通知が提供されます。

  • Auto Loader は、ファイルを処理するために FlushWithClose イベントをリッスンします。

  • Auto Loader ストリームは、ファイルを検出するためのRenameFileアクションをサポートします。 RenameFile アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージシステムへのAPIリクエストが必要です。

  • Auto LoaderDatabricks Runtime9.0RenameDirectory 以降で作成された ストリームは、ファイルを検出するための アクションをサポートします。RenameDirectory アクションでは、名前が変更されたディレクトリの内容を一覧表示するために、ストレージシステムへのAPIリクエストが必要です。

Google Cloud ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。

注:

クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの遅延に関する厳格なSLAを提供していません。 Databricks では、cloudFiles.backfillInterval オプションを使用して Auto Loader で定期的なバックフィルをトリガーし、データの完全性が要件である場合は、特定の SLA 内のすべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。

ADLS Gen2 および Azure Blob ストレージのファイル通知を構成するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 「Azure Blob Storage」を参照してください。

ファイル通知モードを使用するには、イベント通知サービスを設定してアクセスするための認証資格情報を指定する必要があります。 認証に必要なのはサービスプリンシパルだけです。

  • サービスプリンシパル - Azure 組み込みロールの使用

    Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービスプリンシパルをクライアント ID とクライアント シークレットの形式で作成します。

    このアプリに、入力パスが存在するストレージ アカウントに次のロールを割り当てます。

    • 共同作成者: このロールは、キューやイベント サブスクリプションなど、ストレージ アカウント内のリソースを設定するためのものです。

    • ストレージ キュー データ共同作成者: このロールは、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。 このロールは、接続文字列なしでサービスプリンシパルを提供する場合にのみ必要です。

    このアプリに、関連するリソース グループに次のロールを割り当てます。

    • EventGrid EventSubscription 共同作成者: このロールは、イベント サブスクリプションの作成や一覧表示など、イベント グリッド サブスクリプション操作を実行するためのものです。

    詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。

  • サービスプリンシパル - カスタムロールの使用

    上記のロールに必要な過剰なアクセス許可が懸念される場合は、少なくとも次のアクセス許可を持つ カスタム ロール を作成できます (以下、Azure ロール JSON 形式)。

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    その後、このカスタムロールをアプリに割り当てることができます。

    詳細については、「 Azureポータルを使用して Azure ロールを割り当てる」を参照してください。

自動ローダーのアクセス許可

一般的なエラーのトラブルシューティング

エラー:

java.lang.RuntimeException: Failed to create event grid subscription.

Auto Loader を初めて実行するときにこのエラー メッセージが表示される場合は、Event Grid が Azure サブスクリプションにリソース プロバイダーとして登録されていません。これを Azure portal に登録するには:

  1. サブスクリプションに移動します。

  2. 「設定」セクションの 「リソース・プロバイダ 」をクリックします。

  3. 登録する the provider Microsoft.EventGrid.

エラー:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Auto Loader を初めて実行するときにこのエラー メッセージが表示される場合は、Event Grid のサービスプリンシパルとストレージ アカウントに対して共同作成者ロールが付与されていることを確認してください。

AWS S3のファイル通知を設定するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 詳細については、 S3 接続の詳細 を参照してください。

ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

ここで、

  • <bucket-name>: ストリームがファイルを読み取る S3 バケット名 (例: auto-logs)。 ワイルドカードとして * を使用できます (例: databricks-*-logs)。 DBFS パスの基になる S3 バケットを見つけるには、 %fs mountsを実行して、ノートブック内のすべての DBFS マウント ポイントを一覧表示できます。

  • <region>: S3 バケットが存在する AWS リージョン (例: us-west-2)。 地域を指定しない場合は、 *を使用します。

  • <account-number>: S3 バケットを所有する AWS アカウント番号 (例: 123456789012)。 アカウント番号を指定しない場合は、 *を使用します。

SQS および SNS ARN 仕様の文字列 databricks-auto-ingest-* は、 cloudFiles ソースが SQS および SNS サービスを作成するときに使用する名前プレフィックスです。 Databricks はストリームの初回実行時に通知サービスを設定するため、初回実行後に権限を減らしたポリシーを使用できます (たとえば、ストリームを停止してから再起動するなど)。

注:

上記のポリシーは、ファイル通知サービス (S3 バケット通知、SNS、SQS サービス) の設定に必要なアクセス許可のみに関係しており、S3 バケットへの読み取りアクセスがすでにあることを前提としています。 S3 読み取り専用のアクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup ステートメントのActionリストに以下を追加します。

  • s3:ListBucket

  • s3:GetObject

初期セットアップ後のアクセス許可の制限

上記のリソース設定権限は、ストリームの初回実行時にのみ必要です。 最初の実行後、アクセス許可を減らした次の IAM ポリシーに切り替えることができます。

重要

アクセス許可が減ると、障害が発生した場合に新しいストリーミング クエリを開始したり、リソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除されたなど)。また、クラウド リソース管理 API を使用してリソースを一覧表示または破棄することもできません。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

別の AWS アカウントにデータを安全に取り込む

ロールを引き受けることでAuto Loader AWSアカウント全体にデータをロードできます。IAMAssumeRole によって作成された一時的なセキュリティ資格情報を設定した後、 Auto Loader でクラウド ファイルをクロスアカウントとして読み込むことができます。 cross-AWS アカウントの Auto Loader を設定するには、 AssumeRole ポリシーを使用して cross-アカウント S3 バケットにアクセスする のドキュメントに従ってください。 次のことを確認してください。

  • AssumeRole メタロールがクラスターに割り当てられていることを確認します。

  • クラスターの Spark 構成を構成して、次のプロパティを含めます。

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

GCSのファイル通知を設定するために必要な権限

GCS バケットとすべてのオブジェクトに対する list および get のアクセス許可が必要です。 詳細については、 IAM のアクセス許可に関する Google のドキュメントを参照してください。

ファイル通知モードを使用するには、 GCS サービス アカウント と Google クラウド Pub/Sub リソースへのアクセスに使用するアカウントの権限を追加する必要があります。

Pub/Sub Publisher ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google クラウド Pub/Sub にイベント通知メッセージを公開できます。

Google Cloud Pub/Subリソースに使用するサービスアカウントについては、以下の権限を追加する必要があります。

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

これを行うには、これらのアクセス許可を持つ IAM カスタムロールを作成するか、 既存の GCP ロール を割り当ててこれらのアクセス許可をカバーします。

GCS サービスアカウントの検索

対応するプロジェクトの Google Cloud コンソールで、[ Cloud Storage > Settings] に移動します。 「クラウドストレージサービスアカウント」セクションには、GCSサービスアカウントのEメールが含まれています。

GCS サービス アカウント

ファイル通知モード用のカスタム Google Cloud IAMロールの作成

対応するプロジェクトの Google Cloud コンソールで、[ IAM & Admin > Roles] に移動します。 次に、上部にロールを作成するか、既存のロールを更新します。 ロールの作成または編集画面で、[ Add Permissions] をクリックします。 必要な権限をロールに追加できるメニューが表示されます。

GCP IAM カスタムロール

ファイル通知リソースを手動で構成または管理する

特権ユーザーは、ファイル通知リソースを手動で構成または管理できます。

  • クラウドプロバイダーを通じてファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については、「 ファイル通知オプション 」を参照してください。

  • 次の例に示すように、 Scala APIs を使用して通知とキューイング サービスを作成または管理します。

注:

クラウド インフラストラクチャを構成または変更するには、適切なアクセス許可が必要です。 AzureS3または GCS の権限に関するドキュメントを参照してください。

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

setUpNotificationServices(<resource-suffix>) を使用して、<prefix>-<resource-suffix> という名前のキューとサブスクリプションを作成します (プレフィックスは、Auto Loader ファイル通知モードで使用されるクラウド リソースに要約されているストレージ システムによって異なります。同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成する代わりに既存のリソースを再利用します。 この関数は、ファイル通知オプションの識別子を使用してcloudFilesソースに渡すことができるキュー識別子を返します。これにより、 cloudFiles ソース ユーザーは、リソースを作成するユーザーよりも少ないアクセス許可を持つことができます。

を呼び出す場合にのみnewManagerする"path"オプションを提供します (setUpNotificationServices;listNotificationServicestearDownNotificationServicesには必要ありません。これは、ストリーミング クエリを実行するときに使用するのと同じ path です。

次のマトリックスは、ストレージの種類ごとに、どの Databricks Runtime でどの API メソッドがサポートされているかを示しています。

クラウドストレージ

セットアップAPI

リストAPI

ティアダウンAPI

AWS S3の

すべてのバージョン

すべてのバージョン

すべてのバージョン

ADLS Gen2

すべてのバージョン

すべてのバージョン

すべてのバージョン

GCSの

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Azure Blobストレージ

すべてのバージョン

すべてのバージョン

すべてのバージョン

ADLS Gen1

サポートされていません

サポートされていません

サポートされていません