メインコンテンツまでスキップ

ファイル通知モードで Auto Loader ストリームを構成する

このページでは、ファイル通知モードを使用してクラウド データを段階的に検出して取り込むように Auto Loader ストリームを構成する方法について説明します。

ファイル通知モードでは、Auto Loaderは入力ディレクトリからのファイルイベントを購読する通知サービスとキューサービスを自動的に設定します。ファイル通知機能を利用することで、Auto Loaderを拡張し、1時間に数百万個のファイルを取り込むことができます。ディレクトリ一覧表示モードと比較すると、ファイル通知モードの方が高速で拡張性にも優れています。また、ファイル通知とディレクトリ一覧表示をいつでも切り替えることができ、データ処理が一度だけ行われるという保証も維持されます。

ファイル通知オプションやクラウド固有の認証オプションなど、Auto Loader のすべての構成設定の完全なリファレンスについては、 「Auto Loader オプション」を参照してください。

注記

ファイルイベントによるファイル通知モードはコストと拡張性を向上させるものの、ファイルの検出または処理の順序を保証するものではありません。ファイルの到着順が前後する場合でも処理できるようにパイプラインを設計してください。手順については、 「順不同データの処理」を参照してください。

外部ロケーションでファイルイベントが有効になっているかどうかのファイル通知モード

ファイル通知モードを使用するように Auto Loader を設定するには、次の 2 つの方法があります。

  • (推奨)ファイルイベント:指定された外部ロケーションからファイルを処理するすべてのストリームに対して、単一のファイル通知キューを使用します。この方式は、従来のファイル通知方式に比べて以下の利点があります。

    • Databricks では、サービス資格情報やその他のクラウド固有の認証オプションを使用して Auto Loader に追加の資格情報を提供する必要なく、クラウド ストレージ アカウントにサブスクリプションとファイル イベントを設定できます。「外部ロケーションのファイル イベントのセットアップ」を参照してください。
    • クラウドストレージアカウントで作成するIAMロールポリシーが少なくなります。
    • Auto Loaderストリームごとにキューを作成する必要がなくなったため、 「クラシックAuto Loaderファイル通知モードで使用されるクラウド リソース」にリストされているクラウド プロバイダーの通知制限に達することを回避するのが簡単になります。
    • Databricks はリソース要件のチューニングを自動的に管理するため、 cloudFiles.fetchParallelismなどのパラメーターを調整する必要はありません。
    • クリーンアップ機能を使用すると、ストリームが削除されたときや完全に更新されたときなど、クラウド上で作成される通知のライフサイクルについてそれほど心配する必要はありません。
  • 従来のファイル通知モード:各Auto Loaderごとにファイル通知キューを個別に管理します。 Auto Loaderは、入力ディレクトリからのファイルイベントを購読する通知サービスとキューサービスを自動的に設定します。これは古典的な手法だ。

ディレクトリ一覧モードでAuto Loader使用する場合、 Databricksファイル イベントを使用したファイル通知モードに移行することをお勧めします。 ファイル イベントを備えたAuto Loaderにより、パフォーマンスが大幅に向上します。 まず、外部ロケーションのファイル イベントを有効にしてから、 Auto Loaderストリーム構成でcloudFiles.useManagedFileEventsを設定します。

ファイル通知モードをファイルイベントで使用する

このセクションでは、ファイル イベントを使用するためにAuto Loaderストリームを作成および更新する方法について説明します。 Databricksは、ファイル通知モードを使用する際に、以下の点を強く推奨します。

  • Unity Catalogボリュームを使用する: Auto Loaderデータを読み込むパスまたはサブディレクトリごとに、個別の外部ボリュームを作成します。 クラウドストレージのURL(例: s3://bucket/path )の代わりに、ボリュームパス(例: /Volumes/catalog/schema/volume )をAuto Loaderに渡します。これにより、ファイルイベントサービスが外部ロケーション内のすべてのオブジェクトを反復処理するのではなく、関連するオブジェクトのみにファイル検出の範囲を絞り込むことができるため、ファイル検出のパフォーマンスが向上します。
  • サブパスごとに個別のボリュームを使用する: 同じ外部ロケーションにある異なるサブパスから複数のAuto Loaderストリームを読み取っている場合は、単一のボリュームを共有するのではなく、サブパスごとに専用のボリュームを作成します。 これにより、不要なファイル検出のオーバーヘッドが回避され、レート制限を防ぐのに役立ちます。

始める前に

ファイルイベントを設定するには、次のものが必要です。

  • Unity Catalog に対して有効になっている Databricks ワークスペース。
  • ストレージ資格情報と外部ロケーション オブジェクトを Unity Catalogに作成する権限。

ファイル イベントを含む Auto Loader ストリームには、次のものが必要です。

  • コンピュート Databricks Runtime 14.3 LTS 以上。

設定手順

次の手順は、新しい Auto Loader ストリームを作成する場合でも、既存のストリームを移行してファイルイベントでアップグレードされたファイル通知モードを使用する場合にも適用されます。

  1. ストレージ資格 情報外部ロケーション を Unity Catalog に作成して、 Auto Loader ストリームのクラウドストレージのソースロケーションへのアクセスを許可します。
  2. 外部ロケーションのファイル イベントを有効にします。 「外部ロケーションのファイル イベントのセットアップ」を参照してください。
  3. 新しい Auto Loader ストリームを作成するか、既存のストリームを編集して外部ロケーションと連携させる場合:
    • 外部ロケーションからのデータを使用する既存の 通知ベースの Auto Loader ストリーム がある場合は、それらをオフにして、関連付けられている通知リソースを削除します。
    • pathRewritesが設定されていないことを確認します (これは一般的なオプションではありません)。
    • がファイル イベントを使用してファイル通知を管理するときに無視する 設定の一覧 を確認します。Auto Loader新しい Auto Loader ストリームではそれらを避け、このモードに移行する既存のストリームから削除します。
    • Auto Loader コードでオプション cloudFiles.useManagedFileEventstrue に設定します。

例えば:

Python
autoLoaderStream = (spark.readStream
.format("cloudFiles")
...
.options("cloudFiles.useManagedFileEvents", True)
...)

Lakeflow Spark宣言型パイプラインを使用していて、ストリーミング テーブルを備えたパイプラインがすでにある場合は、それを更新してuseManagedFileEventsオプションを含めます。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
FROM STREAM read_files('abfss://path/to/external/location/or/volume',
format => '<format>',
useManagedFileEvents => 'True'
...
);

サポートされていない Auto Loader 設定

次の Auto Loader 設定は、ストリームがファイルイベントを使用する場合にサポートされません。

設定

変更

useIncremental

ファイル通知の効率性とディレクトリリストのシンプルさのどちらかを決定する必要はもうありません。ファイルイベントを使用したAuto Loaderには、1つのモードがあります。

useNotifications

キューとストレージ・イベント・サブスクリプションは、外部ロケーションごとに 1 つだけ存在します。

cloudFiles.fetchParallelism

Auto Loader with file イベントでは、手動の並列処理の最適化は提供されません。

cloudFiles.backfillInterval

Databricks は、ファイル イベントが有効になっている外部ロケーションのバックフィルを自動的に処理します。

cloudFiles.pathRewrites

このオプションは、外部データの場所を DBFS にマウントする場合にのみ適用されます。これは非推奨です。

resourceTags

リソースタグは、クラウドコンソールを使用して設定する必要があります。

管理されたファイル イベントのベスト プラクティスについては、 「ファイル イベントを使用した Auto Loader のベスト プラクティス」を参照してください。

ファイルイベントを使用した Auto Loader に関する制限

ファイル・イベント・サービスは、最近作成されたファイルをキャッシュすることで、ファイル検出を最適化します。Auto Loaderの実行頻度が低い場合、このキャッシュは期限切れになり、Auto Loaderディレクトリの一覧にフォールバックしてファイルを検出し、キャッシュを更新します。このシナリオを回避するには、少なくとも 7 日ごとに Auto Loader を呼び出します。

ファイル・イベントの制限の一般的なリストについては、 ファイル・イベントの制限を参照してください。

各Auto Loaderのファイル通知キューを個別に管理する (クラシック)

従来のファイル通知モードでは、Auto Loaderは各ストリーム専用の通知サービスとキューを自動的に設定します。このアプローチでは、ストリームごとに通知キューを管理し、クラウド リソースの作成に必要な認証情報を提供する必要があります。Databricksは、新規ワークロードにはファイル通知モードを推奨しています。

重要

ファイル通知モード用にクラウドインフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 以下をご覧ください:

従来のファイル通知モードでは、Auto Loader は入力ディレクトリからのファイルイベントを購読する各ストリームに対して、通知サービスとキューサービスを自動的に設定します。各Auto Loaderストリームの通知キューを個別に管理します。

警告

Auto Loader 、クラシックファイル通知モードでのソースパスの変更をサポートしていません。 パスを変更すると、パスの更新時に新しい場所に既に存在するファイルの取り込みに失敗する可能性があります。

従来の Auto Loader ファイル通知モードで使用されるクラウド リソース

Auto Loader は、オプション cloudFiles.useNotificationstrue に設定し、クラウド リソースを作成するために必要なアクセス許可を提供すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限をに付与するための 追加オプション を提供する必要がある場合があります。Auto Loader

次の表は、Auto Loader が各クラウド プロバイダーに対して作成するリソースの一覧です。

クラウドストレージ

サブスクリプションサービス

キューサービス

接頭辞*

制限**

Amazon S3

AWS SNSの

AWS SQSの

Databricks の自動取り込み

S3バケットあたり100個

ADLSの

Azure イベント グリッド

Azure キュー ストレージ

Databricks

ストレージ アカウントあたり 500

GCSの

Google Pub/Sub

Google Pub/Sub

Databricks の自動取り込み

GCSバケットあたり100個

Azure Blobストレージ

Azure イベント グリッド

Azure キュー ストレージ

Databricks

ストレージ アカウントあたり 500

* Auto Loader は、このプレフィックスを使用してリソースに名前を付けます。

** 起動できる並列ファイル通知パイプラインの数

これらの制限で許可されているよりも多くのファイル通知ベースの Auto Loader ストリームを実行する必要がある場合は、 ファイル イベント またはサービス (AWS Lambda、Azure Functions、Google Cloud Functions など) を使用して、コンテナーまたはバケット全体をリッスンする 1 つのキューからディレクトリ固有のキューに通知をファンアウトできます。

クラシックファイル通知イベント

Amazon S3 は、ファイルが S3 バケットにアップロードされたときに、プットアップロードとマルチパートアップロードのどちらによってアップロードされたかに関係なく、 ObjectCreated イベントを提供します。

Azure データレイク Storage は、ストレージ コンテナーに表示されるファイルに対してさまざまなイベント通知を提供します。

  • Auto Loader は、ファイルを処理するために FlushWithClose イベントをリッスンします。
  • Auto Loader ストリームは、ファイルを検出するためのRenameFileアクションをサポートします。 RenameFile アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージシステムへのAPIリクエストが必要です。
  • Auto LoaderDatabricks Runtime9.0RenameDirectory 以降で作成された ストリームは、ファイルを検出するための アクションをサポートします。RenameDirectory アクションでは、名前が変更されたディレクトリの内容を一覧表示するために、ストレージシステムへのAPIリクエストが必要です。

Google Cloud ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。

注記

クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの遅延に関する厳格なSLAを提供していません。 Databricks では、cloudFiles.backfillInterval オプションを使用して Auto Loader で定期的なバックフィルをトリガーし、データの完全性が要件である場合は、特定の SLA 内のすべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。

Azure データレイク ストレージと Azure Blob ストレージのファイル通知を設定するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 「Azure Blob Storage」を参照してください。

ファイル通知モードを使用するには、イベント通知サービスの設定およびアクセスに必要な認証情報を提供する必要があります。以下のいずれかの方法で認証できます。

認証情報を取得したら、以下のいずれかの方法を使用して必要な権限を割り当ててください。

  • Azure の組み込みロール:

    • 入力パスが存在するストレージアカウントに対して、アクセスコネクタに以下の役割を割り当ててください。

      • 貢献者:この役割は、キューやイベント購読など、ストレージアカウント内のリソースを設定するためのものです。
      • ストレージキューデータコントリビューター:この役割は、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。この役割は、接続文字列を指定せずにサービスプリンシパルを提供する場合にのみ必要です。
    • アクセスコネクタに、関連するリソースグループに対して以下の役割を割り当てます。

      • EventGrid EventSubscription 共同作成者 : このロールは、イベント サブスクリプションの作成や一覧表示など、Azure Event Grid (Event Grid) サブスクリプション操作を実行するためのものです。
  • カスタムロール:前述のロールに必要な権限を付与することに懸念がある場合は、代わりに以下の権限を持つカスタムロールを作成できます。役割を作成したら、それをアクセスコネクタに割り当ててください。詳細については、 Azureポータルを使用したAzureロールの割り当て」を参照してください。

    JSON
    "permissions": [
    {
    "actions": [
    "Microsoft.EventGrid/eventSubscriptions/write",
    "Microsoft.EventGrid/eventSubscriptions/read",
    "Microsoft.EventGrid/eventSubscriptions/delete",
    "Microsoft.EventGrid/locations/eventSubscriptions/read",
    "Microsoft.Storage/storageAccounts/read",
    "Microsoft.Storage/storageAccounts/write",
    "Microsoft.Storage/storageAccounts/queueServices/read",
    "Microsoft.Storage/storageAccounts/queueServices/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/read",
    "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
    ],
    "notActions": [],
    "dataActions": [
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
    "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
    ],
    "notDataActions": []
    }
    ]

    Auto Loader 権限設定

Amazon S3 のファイル通知を設定するために必要なアクセス許可

入力ディレクトリの読み取り権限が必要です。 詳細については、 S3 接続の詳細 を参照してください。

ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。このIAM ロールは、Databricks認証に使用する サービス資格情報 Auto Loaderを作成するために必要です。サービス資格情報のサポートは、Databricks Runtime 16.1 以降で利用できます。

JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}

以下のプレースホルダーを、ご使用の環境に合わせて値に置き換えてください。

  • <bucket-name>: ストリームがファイルを読み取る S3 バケット名 (例: auto-logs)。 * をワイルドカードとして使用できます。たとえば databricks-*-logs です。 DBFS パスの基になる S3 バケットを見つけるには、 %fs mountsを実行して、ノートブック内のすべての DBFS マウント ポイントを一覧表示できます。
  • <region>: S3 バケットが存在する AWS リージョン (例: us-west-2)。地域を指定しない場合は、 *を使用します。
  • <account-number>: S3バケットを所有するAWSアカウント番号 (例: 123456789012 )。 アカウント番号を指定したくない場合は、 *を使用します。

SQS および SNS のARN仕様の文字列databricks-auto-ingest-*は、SQS および SNS サービスの作成時にcloudFilesソースが使用する名前のプレフィックスです。 Databricksはストリームの初回実行時に通知サービスを設定するため、初回実行後に権限を制限したポリシーを使用できます(たとえば、ストリームを停止してから再起動する場合など)。

注記

前述のポリシーは、ファイル通知サービス、つまりS3バケット通知、SNS、および SQS サービスの設定に必要な権限のみに関係しており、 S3バケットへの読み取りアクセス権をすでに持っていることを前提としています。 S3の読み取り専用権限を追加する必要がある場合は、JSONドキュメントのDatabricksAutoLoaderSetupステートメント内のActionリストに以下を追加してください。

  • s3:ListBucket
  • s3:GetObject

初期設定後の権限の制限

ストリームの初回実行時のみ、上記で説明したリソース設定権限が必要になります。最初の実行後は、権限が制限された次のIAMポリシーに切り替えることができます。 ただし、権限が制限されていると、新しいストリーミング クエリを開始したり、SQS キューを誤って削除した場合などのリソース障害後にリソースを再作成したり、クラウド リソース管理API使用してリソースを一覧表示したり破棄したりすることはできません。

JSON
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::<bucket-name>"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::<bucket-name>/*"]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}

別の AWS アカウントにデータを安全に取り込む

Auto Loader 、 AssumeRoleを使用してIAMロールを想定することで、 AWSアカウント全体にデータをロードできます。 クロスアカウント取り込み用にAuto Loader設定するには、 「AssumeRole ポリシーを使用してクロスアカウントS3バケットにアクセスする」で説明されている手順に従います。 次に、 AssumeRoleメタロールがクラスターに割り当てられていることを確認し、クラスターのSpark構成に次のプロパティが含まれるように構成します。

ini
fs.s3a.credentialsType AssumeRole
fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
fs.s3a.acl.default BucketOwnerFullControl

GCS のファイル通知を設定するために必要な権限

ファイル通知モードを使用するには、権限を設定する必要があります。

  • GCSバケットおよびすべてのオブジェクトに対して、 listget権限が付与されていることを確認してください。詳細については、 IAM権限に関するGoogleのドキュメントを参照してください。

  • Pub/Sub Publisher ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google Cloud Pub/Sub にイベント通知メッセージを公開できます。

  • Google クラウド Pub/Sub リソースに使用されるサービス アカウントに次の権限を追加します。 これらの権限を持つIAMカスタムロールを作成するか、既存のGCPロールを割り当ててこれらの権限をカバーすることができます。Databricksは、サービス認証情報を作成する際に、このサービスアカウントを自動的に作成します。サービス認証情報のサポートは、Databricks Runtime 16.1以降で利用可能です。

    pubsub.subscriptions.consume
    pubsub.subscriptions.create
    pubsub.subscriptions.delete
    pubsub.subscriptions.get
    pubsub.subscriptions.list
    pubsub.subscriptions.update
    pubsub.topics.attachSubscription
    pubsub.topics.detachSubscription
    pubsub.topics.create
    pubsub.topics.delete
    pubsub.topics.get
    pubsub.topics.list
    pubsub.topics.update

GCS サービス アカウントの検索

対応するプロジェクトの Google Cloud Console で [クラウド ストレージ] > [設定] に移動します。 クラウド ストレージ サービス アカウント セクションには、 GCSサービス アカウントの電子メールが含まれます。

GCS サービス アカウント

ファイル通知 用のカスタムGoogle Cloud ロールの作成IAMMode

対応するプロジェクトの Google クラウド コンソールで IAMと管理] > [ロール] に移動します。 次に、役割を作成するか、既存の役割を更新します。 「役割の作成」 ページで、 「権限の追加」 を選択します。メニューで、役割に必要な権限を追加します。

GCP IAM カスタムロール

ファイル通知リソースを手動で構成または管理する

特権ユーザーは、ファイル通知リソースを手動で設定または管理できます。クラウドプロバイダー経由でファイル通知サービスを設定し、キュー識別子を指定するには、 「ファイル通知オプション」を参照してください。代わりにScala APIs使用して通知とキューイングサービスを作成または管理するには、次の方法を使用してください。

注記

クラウド インフラストラクチャを構成または変更するには、適切なアクセス許可が必要です。 AzureS3または GCS の権限に関するドキュメントを参照してください。

ステップ 1: AWS 、 Azure 、または Google クラウドで ResourceManager を作成する

Python

# Create a ResourceManager in AWS

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.create()

# Using an AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("cloudFiles.awsAccessKey", <aws-access-key>) \
.option("cloudFiles.awsSecretKey", <aws-secret-key>) \
.option("cloudFiles.roleArn", <role-arn>) \
.option("cloudFiles.roleExternalId", <role-external-id>) \
.option("cloudFiles.roleSessionName", <role-session-name>) \
.option("cloudFiles.stsEndpoint", <sts-endpoint>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

Python

# Create a ResourceManager in Azure

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()

Python

# Create a ResourceManager in GCP

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("cloudFiles.client", <client-id>) \
.option("cloudFiles.clientEmail", <client-email>) \
.option("cloudFiles.privateKey", <private-key>) \
.option("cloudFiles.privateKeyId", <private-key-id>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()

ステップ 2: リソース マネージャーを使用して、ファイル通知サービスをセットアップ、表示、破棄する

Python

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>.
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

setUpNotificationServices(<resource-suffix>)を使用して、 <prefix>-<resource-suffix>という名前のキューとサブスクリプションを作成します (プレフィックスは、クラシックAuto Loaderファイル通知モードで使用されるクラウド リソースにまとめられているストレージ システムによって異なります)。 同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成するのではなく、既存のリソースを再利用します。この関数は、ファイル通知オプションの識別子を使用してcloudFilesソースに渡すことができるキュー識別子を返します。これにより、 cloudFilesソース ユーザーは、リソースを作成したユーザーよりも少ない権限を持つことができます。

setUpNotificationServicesを呼び出す場合にのみ、 newManager"path"オプションを提供してください。listNotificationServicesまたはtearDownNotificationServicesには必要ありません。これは、ストリーミングクエリを実行する際に使用するpathと同じです。

次のマトリックスは、ストレージの種類ごとに、どの Databricks Runtime でどの API メソッドがサポートされているかを示しています。

クラウドストレージ

セットアップAPI

リストAPI

ティアダウンAPI

Amazon S3

すべてのバージョン

すべてのバージョン

すべてのバージョン

ADLSの

すべてのバージョン

すべてのバージョン

すべてのバージョン

GCSの

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Databricks Runtime 9.1 以降

Azure Blobストレージ

すべてのバージョン

すべてのバージョン

すべてのバージョン

Auto Loader が作成したイベント通知リソースをクリーンアップします

Auto Loader は、ファイル通知リソースを自動的に破棄しません。 ファイル通知リソースを破棄するには、前のセクションで示したように、クラウド リソース マネージャーを使用する必要があります。 これらのリソースは、クラウド プロバイダーの UI または APIを使用して手動で削除することもできます。

一般的なエラーのトラブルシューティング

このセクションでは、ファイル通知モードで Auto Loader を使用する場合の一般的なエラーとその解決方法について説明します。

Event Grid サブスクリプションを作成できませんでした

Auto Loader を初めて実行したときに次のエラー メッセージが表示される場合は、Azure サブスクリプションに Event Grid がリソース プロバイダーとして登録されていません。

java.lang.RuntimeException: Failed to create event grid subscription.

Event Grid をリソース プロバイダーとして登録するには、次の操作を行います。

  1. Azure portal で、サブスクリプションに移動します。
  2. [設定] セクションで [リソース プロバイダー] を選択します。
  3. 登録する the provider Microsoft.EventGrid.

Event Grid サブスクリプション操作を実行するために必要な承認

Auto Loader を初めて実行するときに次のエラー メッセージが表示される場合は、 共同作成者 ロールが Event Grid のサービス プリンシパルとストレージ アカウントに割り当てられていることを確認します。

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Event Grid クライアントがプロキシをバイパス

Databricks Runtime 15.2 以降では、Auto Loader の Event Grid 接続では、デフォルトによるシステム プロパティのプロキシ設定が使用されます。Databricks Runtime 13.3 LTS、14.3 LTS、および 15.0 から 15.2 では、 Spark Config プロパティを spark.databricks.cloudFiles.eventGridClient.useSystemProperties trueに設定することで、プロキシを使用するように Event Grid 接続を手動で構成できます。 「Databricks での Spark 構成プロパティの設定」を参照してください

リクエストが多すぎます

Auto Loaderストリーム ログに次のエラー メッセージが表示される場合は、ストリームがDatabricksファイル イベント サービスのレート制限を超えていることを示します。

com.databricks.sql.util.UnexpectedHttpStatus: Too many requests. Please wait a moment and try again.

これは通常、複数のAuto LoaderストリームがUnity Catalogボリュームを使用せずに、同じ外部ロケーションにある異なるサブパスから読み取った場合に発生します。 ファイルイベントサービスは、各ストリームに関連するファイルを見つけるために、外部ロケーションにあるすべてのオブジェクトを反復処理する必要があり、その結果、API呼び出しが過剰になります。この問題を解決するには、 「ファイルイベントでファイル通知モードを使用する」に記載されている推奨事項に従ってください。

その他のリソース