Auto Loaderファイル通知モードとは何ですか?
ファイル通知モードでは、 Auto Loader は、入力ディレクトリからファイル イベントをサブスクライブする通知サービスとキュー サービスを自動的に設定します。 ファイル通知を使用して、1 時間に数百万のファイルを取り込むようにスケール Auto Loader できます。 ディレクトリ一覧モードと比較すると、ファイル通知モードはよりパフォーマンスとスケーラブルです。
ファイル通知とディレクトリリストをいつでも切り替えることができ、exactly-onceのデータ処理の保証は維持できます。
Auto Loaderのソース パスの変更は、ファイル通知モードではサポートされていません。ファイル通知モードが使用されている場合、パスが変更されると、ディレクトリの更新時に新しいディレクトリに既に存在するファイルの取り込みに失敗する可能性があります。
ファイル通知モード(管理ファイル・イベントあり/なし)
ファイル通知モードを使用するように Auto Loader を設定するには、次の 2 つの方法があります。
-
従来のファイル通知モード: ストリームを個別に Auto Loader するファイル通知キューを管理します。
これが従来のアプローチです。
-
(推奨) マネージド ファイル イベント (パブリック プレビュー):Databricks で定義されている特定の外部ロケーションからのファイルを処理するすべてのストリームに対して、1 つの マネージドUnity Catalog ファイル通知キューを使用します。
この方法では、外部ロケーションのファイル・イベントを有効にする必要があります。 従来のアプローチに比べて、次の利点があります。
- Databricks では、サービス資格情報やその他のクラウド固有の認証オプションを使用して Auto Loader に追加の資格情報を提供することなく、クラウド ストレージ アカウントにサブスクリプションとファイル イベントを設定できます。(推奨) 外部ロケーションのファイル・イベントを有効にするを参照してください。
- クラウドストレージアカウントで作成するIAMロールポリシーが少なくなります。
- Auto Loaderストリームごとにキューを作成する必要がなくなったため、レガシー Auto Loader ファイル通知モードで使用される Cloud リソースに記載されているクラウド プロバイダー通知制限に達するのを簡単に回避できます。
- Databricks はリソース要件のチューニングを自動的に管理するため、
cloudFiles.fetchParallelism
などのパラメーターを調整する必要はありません。 - クリーンアップ機能を使用すると、ストリームが削除されたときや完全に更新されたときなど、クラウド上で作成される通知のライフサイクルについてそれほど心配する必要はありません。
Databricks では、現在ディレクトリ一覧モードで Auto Loader を使用している場合は、マネージド ファイル イベントに移行してパフォーマンスの大幅な向上を確認することをお勧めします。 管理ファイル・イベントでのファイル通知モードの使用を参照してください。
ファイル通知モードを管理対象ファイルイベントで使用する
プレビュー
マネージド ファイル イベントの Auto Loader サポートはパブリック プレビュー段階です。プレビューに登録するには、Databricks アカウント チームにお問い合わせください。
必要条件
管理対象ファイル・イベントには、以下のものが必要です。
- Unity Catalog に対して有効になっている Databricks ワークスペース。
- ストレージ資格情報と外部ロケーション オブジェクトを Unity Catalogに作成する権限。
設定手順
次の手順は、新しい Auto Loader ストリームを作成する場合でも、既存のストリームを移行して、マネージド ファイル イベントでアップグレードされたファイル通知モードを使用する場合にも適用されます。
-
Unity Catalog でストレージ資格情報と外部ロケーションを作成し、Auto Loader ストリームのクラウド ストレージ内のソース ロケーションへのアクセスを許可します。クラウドストレージをDatabricksに接続するための外部ロケーションの作成を参照してください。
-
外部ロケーションのファイル・イベントを有効にします。 (推奨) 外部ロケーションのファイル・イベントを有効にするを参照してください。
-
新しい Auto Loader ストリームを作成するか、既存のストリームを編集して外部ロケーションと連携させる場合:
-
pathRewrites
が設定されていないことを確認します (これは一般的なオプションではありません)。 -
がマネージド ファイル イベントを使用してファイル通知を管理するときに無視する 設定のリスト を確認します。Auto Loader新しい Auto Loader ストリームではそれらを避け、このモードに移行する既存のストリームから削除します。
-
Auto Loader コードでオプション
cloudFiles.useManagedFileEvents
をtrue
に設定します。例えば:PythonautoLoaderStream = (spark.readStream
.format("cloudFiles")
...
.options("cloudFiles.useManagedFileEvents", True)
...)DLTを使用していて、ストリーミングテーブルを含むDLTパイプラインがすでにある場合は、
useManagedFileEvents
オプションを含むように更新します。SQLCREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
FROM STREAM read_files('abfss://path/to/external/location/or/volume',
format => '<format>',
useManagedFileEvents => 'True'
...
)
-
サポートされていない Auto Loader 設定
次の Auto Loader 設定は、ストリームがマネージド ファイル イベントを使用する場合にサポートされません。
設定 | 変更 |
---|---|
| ファイル通知の効率性とディレクトリリストのシンプルさのどちらかを決定する必要はもうありません。Managed File Events の Auto Loader には 1 つのモードがあります。 |
| キューとストレージ・イベント・サブスクリプションは、外部ロケーションごとに 1 つだけ存在します。 |
| Auto Loader with managed file events では、手動での並列処理の最適化は提供されません。 |
| Databricks は、管理対象ファイル・イベントが有効になっている外部ロケーションのバックフィルを自動的に処理します。 |
| このオプションは、外部データの場所を DBFS にマウントする場合にのみ適用されます。これは非推奨です。 |
| リソースタグは、クラウドコンソールを使用して設定する必要があります。 |
管理対象ファイル・イベントに関する制限
管理対象ファイル・イベントの制限のリストについては、 ファイル・イベントの制限を参照してください。
追加の制限は Auto Loaderにのみ適用されます。管理されたファイル イベントは 7 日間保持されます。 したがって、Auto Loader は少なくとも 7 日に 1 回呼び出す必要があります。
それぞれのファイル通知キューの管理: Auto Loader ストリームを個別に (レガシ)
ファイル通知モード用にクラウドインフラストラクチャを自動的に構成するには、昇格されたアクセス許可が必要です。 クラウド管理者またはワークスペース管理者に問い合わせてください。 以下をご覧ください:
従来の Auto Loader ファイル通知モードで使用されるクラウド リソース
Auto Loader は、オプション cloudFiles.useNotifications
を true
に設定し、クラウド リソースを作成するために必要なアクセス許可を提供すると、ファイル通知を自動的に設定できます。 さらに、これらのリソースを作成するための権限をに付与するための 追加オプション を提供する必要がある場合があります。Auto Loader
次の表に、各クラウド プロバイダーの Auto Loader によって作成されるリソースを示します。
クラウドストレージ | サブスクリプションサービス | キューサービス | 接頭辞* | 制限** |
---|---|---|---|---|
Amazon S3 | AWS SNSの | AWS SQSの | Databricks の自動取り込み | S3バケットあたり100個 |
ADLSの | Azure イベント グリッド | Azure キュー ストレージ | Databricks | ストレージ アカウントあたり 500 |
GCSの | Google Pub/Sub | Google Pub/Sub | Databricks の自動取り込み | GCSバケットあたり100個 |
Azure Blobストレージ | Azure イベント グリッド | Azure キュー ストレージ | Databricks | ストレージ アカウントあたり 500 |
* Auto Loader は、このプレフィックスを使用してリソースに名前を付けます。
** 起動できる並列ファイル通知パイプラインの数
これらの制限で許可されている数よりも多くのファイル通知ベースの Auto Loader ストリームを実行する必要がある場合は、 マネージド ファイル イベント や AWS Lambda、 Azure Functions、Google Cloud Functions などのサービスを使用して、コンテナまたはバケット全体をリッスンする単一のキューからディレクトリ固有のキューに通知をファンアウトできます。
従来のファイル通知イベント
Amazon S3 は、ファイルが S3 バケットにアップロードされたときに、プットアップロードとマルチパートアップロードのどちらによってアップロードされたかに関係なく、 ObjectCreated
イベントを提供します。
Azure データレイク Storage は、ストレージ コンテナーに表示されるファイルに対してさまざまなイベント通知を提供します。
- Auto Loader は、ファイルを処理するために
FlushWithClose
イベントをリッスンします。 - Auto Loader ストリームは、ファイルを検出するための
RenameFile
アクションをサポートします。RenameFile
アクションでは、名前が変更されたファイルのサイズを取得するために、ストレージシステムへのAPIリクエストが必要です。 - Auto LoaderDatabricks Runtime9.0
RenameDirectory
以降で作成された ストリームは、ファイルを検出するための アクションをサポートします。RenameDirectory
アクションでは、名前が変更されたディレクトリの内容を一覧表示するために、ストレージシステムへのAPIリクエストが必要です。
Google Cloud ストレージでは、ファイルのアップロード時に上書きやファイルのコピーなどの OBJECT_FINALIZE
イベントが提供されます。 アップロードに失敗しても、このイベントは生成されません。
クラウドプロバイダーは、非常にまれな条件下ですべてのファイルイベントの100%配信を保証するものではなく、ファイルイベントの遅延に関する厳格なSLAを提供していません。 Databricks では、cloudFiles.backfillInterval
オプションを使用して Auto Loader で定期的なバックフィルをトリガーし、データの完全性が要件である場合は、特定の SLA 内のすべてのファイルが検出されるようにすることをお勧めします。 通常のバックフィルをトリガーしても、重複は発生しません。
Azure データレイク ストレージと Azure Blob ストレージのファイル通知を設定するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 「Azure Blob Storage」を参照してください。
ファイル通知モードを使用するには、イベント通知サービスを設定してアクセスするための認証資格情報を指定する必要があります。
次のいずれかの方法を使用して認証できます。
- Databricks Runtime 16.1 以降の場合: Databricks サービス資格情報 (推奨): マネージド ID と Databricks アクセス コネクタを使用して サービス資格情報 を作成します。
- サービスプリンシパル: Microsoft Entra ID (旧称 Azure Active Directory) アプリとサービスプリンシパルをクライアント ID とクライアント シークレットの形式で作成します。
認証資格情報を取得したら、必要なアクセス許可を Databricks アクセス コネクタ (サービス 資格情報の場合) または Microsoft Entra ID アプリ (サービス プリンシパルの場合) に割り当てます。
-
Azure 組み込みロールの使用
アクセス コネクタに、入力パスが存在するストレージ アカウントに次のロールを割り当てます。
- 共同作成者 : このロールは、キューやイベント サブスクリプションなど、ストレージ アカウント内のリソースを設定するためのものです。
- ストレージ キュー データ共同作成者 : このロールは、キューからのメッセージの取得や削除などのキュー操作を実行するためのものです。 このロールは、接続文字列なしでサービスプリンシパルを提供する場合にのみ必要です。
このアクセス コネクタに、関連するリソース グループに次のロールを割り当てます。
- EventGrid EventSubscription 共同作成者 : このロールは、イベント サブスクリプションの作成や一覧表示など、Azure Event Grid (Event Grid) サブスクリプション操作を実行するためのものです。
詳細については、Azureポータルを使用して Azure ロールを割り当てるを参照してください。
-
カスタムロールの使用
上記のロールに必要な過剰なアクセス許可が懸念される場合は、少なくとも次のアクセス許可を持つ カスタム ロール を作成できます (以下、Azure ロール JSON 形式)。
JSON"permissions": [
{
"actions": [
"Microsoft.EventGrid/eventSubscriptions/write",
"Microsoft.EventGrid/eventSubscriptions/read",
"Microsoft.EventGrid/eventSubscriptions/delete",
"Microsoft.EventGrid/locations/eventSubscriptions/read",
"Microsoft.Storage/storageAccounts/read",
"Microsoft.Storage/storageAccounts/write",
"Microsoft.Storage/storageAccounts/queueServices/read",
"Microsoft.Storage/storageAccounts/queueServices/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/read",
"Microsoft.Storage/storageAccounts/queueServices/queues/delete"
],
"notActions": [],
"dataActions": [
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
],
"notDataActions": []
}
]その後、このカスタム ロールをアクセス コネクタに割り当てることができます。
詳細については、Azureポータルを使用して Azure ロールを割り当てるを参照してください。
Amazon S3 のファイル通知を設定するために必要なアクセス許可
入力ディレクトリの読み取り権限が必要です。 詳細については、 S3 接続の詳細 を参照してください。
ファイル通知モードを使用するには、次の JSON ポリシードキュメントを IAM ユーザーまたはロールにアタッチします。このIAM ロールは、Databricks認証に使用する サービス資格情報 Auto Loaderを作成するために必要です。サービス資格情報のサポートは、Databricks Runtime 16.1 以降で利用できます。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
ここで、
<bucket-name>
: ストリームがファイルを読み取る S3 バケット名 (例:auto-logs
)。*
をワイルドカードとして使用できます。たとえばdatabricks-*-logs
です。 DBFS パスの基になる S3 バケットを見つけるには、%fs mounts
を実行して、ノートブック内のすべての DBFS マウント ポイントを一覧表示できます。<region>
: S3 バケットが存在する AWS リージョン (例:us-west-2
)。地域を指定しない場合は、*
を使用します。<account-number>
: S3 バケットを所有する AWS アカウント番号 (例:123456789012
)。アカウント番号を指定しない場合は、*
を使用します。
SQS および SNS ARN 仕様の文字列 databricks-auto-ingest-*
は、 cloudFiles
ソースが SQS および SNS サービスを作成するときに使用する名前プレフィックスです。 Databricks はストリームの初回実行時に通知サービスを設定するため、初回実行後に権限を減らしたポリシーを使用できます (たとえば、ストリームを停止してから再起動するなど)。
上記のポリシーは、ファイル通知サービス (S3 バケット通知、SNS、SQS サービス) の設定に必要なアクセス許可のみに関係しており、S3 バケットへの読み取りアクセスがすでにあることを前提としています。 S3 読み取り専用のアクセス許可を追加する必要がある場合は、JSON ドキュメントの DatabricksAutoLoaderSetup
ステートメントのAction
リストに以下を追加します。
s3:ListBucket
s3:GetObject
初期設定後の権限の制限
上記のリソース設定権限は、ストリームの初回実行時にのみ必要です。 最初の実行後、アクセス許可を減らした次の IAM ポリシーに切り替えることができます。
アクセス許可が減ると、障害が発生した場合に新しいストリーミング クエリを開始したり、リソースを再作成したりすることはできません (たとえば、SQS キューが誤って削除されたなど)。また、Cloud リソース管理 API を使用してリソースをリストアップまたは破棄することもできません。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:PurgeQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::<bucket-name>"]
},
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::<bucket-name>/*"]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
別の AWS アカウントにデータを安全に取り込む
ロールを引き受けることでAuto Loader AWSアカウント全体にデータをロードできます。IAMAssumeRole
によって作成された一時的なセキュリティ資格情報を設定した後、 Auto Loader にクラウドファイルをクロスアカウント間でロードさせることができます。 cross-AWS アカウントの Auto Loader を設定するには、 AssumeRole ポリシーを使用して cross-アカウント S3 バケットにアクセスするのドキュメントに従ってください。 次のことを確認してください。
-
AssumeRole メタロールがクラスターに割り当てられていることを確認します。
-
クラスタリングの Spark 構成を構成して、次のプロパティを含めます。
inifs.s3a.credentialsType AssumeRole
fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
fs.s3a.acl.default BucketOwnerFullControl
GCS のファイル通知を設定するために必要な権限
GCS バケットとすべてのオブジェクトに対する list
および get
のアクセス許可が必要です。 詳細については、 IAM のアクセス許可に関する Google のドキュメントを参照してください。
ファイル通知モードを使用するには、 GCS サービス アカウント と、Google Cloud Pub/Sub リソースへのアクセスに使用するサービス アカウントの権限を追加する必要があります。
Pub/Sub Publisher
ロールを GCS サービス アカウントに追加します。これにより、アカウントは GCS バケットから Google Cloud Pub/Sub にイベント通知メッセージを公開できます。
Google Cloud Pub/Subリソースで利用しているサービスアカウントについては、以下の権限を追加する必要があります。 このサービス アカウントは、Databricks サービス資格情報を作成すると自動的に作成されます。サービス資格情報のサポートは、Databricks Runtime 16.1 以降で利用できます。
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
これを行うには、これらのアクセス許可を持つ IAM カスタムロールを作成するか、 既存の GCP ロール を割り当ててこれらのアクセス許可をカバーします。
GCS サービス アカウントの検索
対応するプロジェクトの Google Cloud コンソールで、Cloud Storage > Settings
に移動します。
「クラウドストレージサービスアカウント」セクションには、GCSサービスアカウントのEメールが含まれています。
ファイル通知 用のカスタムGoogle Cloud ロールの作成IAMMode
対応するプロジェクトの Google Cloud コンソールで、IAM & Admin > Roles
に移動します。 次に、上部にロールを作成するか、既存のロールを更新します。 ロールの作成または編集画面で、Add Permissions
をクリックします。 必要な権限をロールに追加できるメニューが表示されます。
ファイル通知リソースを手動で構成または管理する
特権ユーザーは、ファイル通知リソースを手動で構成または管理できます。
- クラウドプロバイダーを通じてファイル通知サービスを手動で設定し、キュー識別子を手動で指定します。 詳細については、「 ファイル通知オプション 」を参照してください。
- 次の例に示すように、 Scala APIs を使用して通知とキューイング サービスを作成または管理します。
- Python
- Scala
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.create()
# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("cloudFiles.awsAccessKey", <aws-access-key>) \
.option("cloudFiles.awsSecretKey", <aws-secret-key>) \
.option("cloudFiles.roleArn", <role-arn>) \
.option("cloudFiles.roleExternalId", <role-external-id>) \
.option("cloudFiles.roleSessionName", <role-session-name>) \
.option("cloudFiles.stsEndpoint", <sts-endpoint>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("databricks.serviceCredential", <service-credential-name>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("cloudFiles.projectId", <project-id>) \
.option("cloudFiles.client", <client-id>) \
.option("cloudFiles.clientEmail", <client-email>) \
.option("cloudFiles.privateKey", <private-key>) \
.option("cloudFiles.privateKeyId", <private-key-id>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
/**
* Using AWS access key and secret key
*/
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>)
.option("cloudFiles.awsAccessKey", <aws-access-key>)
.option("cloudFiles.awsSecretKey", <aws-secret-key>)
.option("cloudFiles.roleArn", <role-arn>)
.option("cloudFiles.roleExternalId", <role-external-id>)
.option("cloudFiles.roleSessionName", <role-session-name>)
.option("cloudFiles.stsEndpoint", <sts-endpoint>)
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
/**
* Using an Azure service principal
*/
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
/**
* Using a Databricks service credential
*/
val manager = CloudFilesGCPResourceManager
.newManager
.option("cloudFiles.projectId", <project-id>)
.option("databricks.serviceCredential", <service-credential-name>)
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
/**
* Using a Google service account
*/
val manager = CloudFilesGCPResourceManager
.newManager
.option("cloudFiles.projectId", <project-id>)
.option("cloudFiles.client", <client-id>)
.option("cloudFiles.clientEmail", <client-email>)
.option("cloudFiles.privateKey", <private-key>)
.option("cloudFiles.privateKeyId", <private-key-id>)
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
setUpNotificationServices(<resource-suffix>)
を使用して、<prefix>-<resource-suffix>
という名前のキューとサブスクリプションを作成します (プレフィックスは、従来の Auto Loader ファイル通知モードで使用されるクラウド リソースで要約されているストレージ システムによって異なります。同じ名前の既存のリソースがある場合、Databricks は新しいリソースを作成する代わりに既存のリソースを再利用します。この関数は、ファイル通知オプションの識別子を使用してcloudFiles
ソースに渡すことができるキュー識別子を返します。これにより、 cloudFiles
ソース ユーザーは、リソースを作成するユーザーよりも少ないアクセス許可を持つことができます。
を呼び出す場合にのみnewManager
する"path"
オプションを提供します (setUpNotificationServices
;listNotificationServices
やtearDownNotificationServices
には必要ありません。これは、ストリーミング クエリを実行するときに使用するのと同じ path
です。
次のマトリックスは、ストレージの種類ごとに、どの Databricks Runtime でどの API メソッドがサポートされているかを示しています。
クラウドストレージ | セットアップAPI | リストAPI | ティアダウンAPI |
---|---|---|---|
Amazon S3 | すべてのバージョン | すべてのバージョン | すべてのバージョン |
ADLSの | すべてのバージョン | すべてのバージョン | すべてのバージョン |
GCSの | Databricks Runtime 9.1 以降 | Databricks Runtime 9.1 以降 | Databricks Runtime 9.1 以降 |
Azure Blobストレージ | すべてのバージョン | すべてのバージョン | すべてのバージョン |
Auto Loader が作成したイベント通知リソースをクリーンアップします
Auto Loader は、ファイル通知リソースを自動的に破棄しません。 ファイル通知リソースを破棄するには、前のセクションで示したように、クラウド リソース マネージャーを使用する必要があります。 これらのリソースは、クラウド プロバイダーの UI または APIsを使用して手動で削除することもできます。
一般的なエラーのトラブルシューティング
このセクションでは、ファイル通知モードで Auto Loader を使用する場合の一般的なエラーとその解決方法について説明します。
Event Grid サブスクリプションを作成できませんでした
Auto Loader を初めて実行するときに次のエラー メッセージが表示される場合は、Event Grid が Azure サブスクリプションにリソース プロバイダーとして登録されていません。
java.lang.RuntimeException: Failed to create event grid subscription.
Event Grid をリソース プロバイダーとして登録するには、次の操作を行います。
- Azure portal で、サブスクリプションに移動します。
- 設定セクションの リソース プロバイダー をクリックします。
- 登録する the provider
Microsoft.EventGrid
.
Event Grid サブスクリプション操作を実行するために必要な承認
Auto Loader を初めて実行するときに次のエラー メッセージが表示される場合は、 共同作成者 ロールが Event Grid のサービス プリンシパルとストレージ アカウントに割り当てられていることを確認します。
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Event Grid クライアントがプロキシをバイパス
Databricks Runtime 15.2 以降では、Auto Loader の Event Grid 接続では、デフォルトによるシステム プロパティのプロキシ設定が使用されます。Databricks Runtime 13.3 LTS、14.3 LTS、および 15.0 から 15.2 では、 Spark Config プロパティを spark.databricks.cloudFiles.eventGridClient.useSystemProperties true
に設定することで、プロキシを使用するように Event Grid 接続を手動で構成できます。 「Databricks での Spark 構成プロパティの設定」を参照してください。