Azure Blob Storage ファイル ソースと Azure Queue Storage (レガシ)
このドキュメントは廃止されており、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジはサポートされなくなりました。 「Auto Loaderとは」を参照してください。
ABS-AQS コネクタは、Azure Queue Storage (AQS) を使用して、すべてのファイルを繰り返し一覧表示せずに Azure Blob Storage (ABS) コンテナーに書き込まれた新しいファイルを検索する最適化されたファイル ソースを提供します。 これには、次の 2 つの利点があります。
- 低レイテンシ:ABSにネストされたディレクトリ構造をリストする必要がないため、これは遅く、リソースを大量に消費します。
- コストの削減:ABSに対してコストのかかるLIST APIリクエストを行う必要はもうありません。
ABS-AQS ソースは、イベントを使用するときに AQS キューからメッセージを削除します。 他のパイプラインでこのキューからメッセージを使用する場合は、最適化されたリーダー用に別の AQS キューを設定します。 複数の Event Grid サブスクリプションを設定して、異なるキューに発行できます。
ABS-AQS ファイル ソースを使用する
ABS-AQS ファイル ソースを使用するには、次の操作を行う必要があります。
-
Azure Event Grid サブスクリプションを利用して ABS イベント通知を設定し、それらを AQS にルーティングします。 「Blob Storage イベントへの対応」を参照してください。
-
fileFormat
オプションとqueueUrl
オプション、およびスキーマを指定します。例えば:Pythonspark.readStream \
.format("abs-aqs") \
.option("fileFormat", "json") \
.option("queueName", ...) \
.option("connectionString", ...) \
.schema(...) \
.load()
Azure Queue Storage と Blob Storage で認証する
Azure Queue Storage と Blob Storage で認証するには、Shared Access Signature (SAS) トークンまたはストレージ アカウント キーを使用します。 キューがデプロイされているストレージ アカウントの接続文字列 (SAS トークンまたはストレージ アカウントへのアクセス キーを含む) を指定する必要があります。 詳細については、「Azure Storage 接続文字列の構成」を参照してください。
また、Azure Blob Storage コンテナーへのアクセスを提供する必要もあります。 Azure Data Lake Storage Gen2Storage Azure Blobコンテナーへのアクセスを構成する方法については、「 Storage と Blob Storage への接続 」を参照してください。
Databricks では、接続文字列の提供に Manage secrets を使用することを強くお勧めします。
構成
オプション | タイプ | デフォルト | 説明 |
---|---|---|---|
allowOverwrites(上書きを許可する) | ブール値 |
| 上書きされた BLOB を再処理するかどうか。 |
コネクションストリング | 文字列 | なし(必須パラメータ) | キューにアクセスするための 接続文字列 。 |
fetchParallelism (フェッチ並列処理) | 整数タイプ | 1 | キューイングサービスからメッセージを取得するときに使用するスレッドの数。 |
ファイル形式 | 文字列 | なし(必須パラメータ) |
|
ignoreFileDeletion (無視ファイルの削除) | ブール値 |
| ライフサイクル設定がある場合、またはソースファイルを手動で削除する場合は、このオプションを |
maxFileAge (マックス ファイル年齢) | 整数タイプ | 604800 | 重複処理を防ぐために、ファイル通知を状態として保存する期間 (秒単位) を決定します。 |
pathRewritesの | JSON 文字列。 |
| マウントポイントを使用する場合は、 |
キューフェッチインターバル | 期間を表す文字列。例:2 分の場合は |
| キューが空の場合にフェッチ間で待機する時間。 Azure は AQS への API 要求ごとに課金されます。 したがって、データが頻繁に到着しない場合は、この値を長い期間に設定できます。 キューが空でない限り、継続的にフェッチします。 新しいファイルが 5 分ごとに作成される場合は、AQS コストを削減するために高い |
キュー名 | 文字列 | なし(必須パラメータ) | AQS キューの名前。 |
ドライバー ログに Fetched 0 new events and 3 old events.
のようなメッセージがたくさんあり、新しいイベントよりも古いイベントが多く観察される傾向がある場合は、ストリームのトリガー間隔を短くする必要があります。
一部のファイルが処理される前に削除される可能性があると予想される Blob Storage 上の場所からファイルを使用している場合は、エラーを無視して処理を続行するように次の構成を設定できます。
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
よくある質問(FAQ)
ignoreFileDeletion
False (デフォルト) でオブジェクトが削除された場合、パイプライン全体は失敗しますか?
はい、ファイルが削除されたことを示すイベントを受け取った場合、パイプライン全体が失敗します。
maxFileAge
はどのように設定すればよいですか?
Azure Queue Storage は、少なくとも 1 回のメッセージ配信セマンティクスを提供するため、重複除去のために状態を保持する必要があります。 maxFileAge
のデフォルト設定は 7 日で、これはキュー内のメッセージの最大 TTL と同じです。