メインコンテンツまでスキップ

Azure Blob Storage ファイル ソースと Azure Queue Storage (レガシ)

important

このドキュメントは廃止されており、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジはサポートされなくなりました。 「Auto Loaderとは」を参照してください。

ABS-AQS コネクタは、Azure Queue Storage (AQS) を使用して、すべてのファイルを繰り返し一覧表示せずに Azure Blob Storage (ABS) コンテナーに書き込まれた新しいファイルを検索する最適化されたファイル ソースを提供します。 これには、次の 2 つの利点があります。

  • 低レイテンシ:ABSにネストされたディレクトリ構造をリストする必要がないため、これは遅く、リソースを大量に消費します。
  • コストの削減:ABSに対してコストのかかるLIST APIリクエストを行う必要はもうありません。
注記

ABS-AQS ソースは、イベントを使用するときに AQS キューからメッセージを削除します。 他のパイプラインでこのキューからメッセージを使用する場合は、最適化されたリーダー用に別の AQS キューを設定します。 複数の Event Grid サブスクリプションを設定して、異なるキューに発行できます。

ABS-AQS ファイル ソースを使用する

ABS-AQS ファイル ソースを使用するには、次の操作を行う必要があります。

  • Azure Event Grid サブスクリプションを利用して ABS イベント通知を設定し、それらを AQS にルーティングします。 「Blob Storage イベントへの対応」を参照してください。

  • fileFormat オプションと queueUrl オプション、およびスキーマを指定します。例えば:

    Python
    spark.readStream \
    .format("abs-aqs") \
    .option("fileFormat", "json") \
    .option("queueName", ...) \
    .option("connectionString", ...) \
    .schema(...) \
    .load()

Azure Queue Storage と Blob Storage で認証する

Azure Queue Storage と Blob Storage で認証するには、Shared Access Signature (SAS) トークンまたはストレージ アカウント キーを使用します。 キューがデプロイされているストレージ アカウントの接続文字列 (SAS トークンまたはストレージ アカウントへのアクセス キーを含む) を指定する必要があります。 詳細については、「Azure Storage 接続文字列の構成」を参照してください。

また、Azure Blob Storage コンテナーへのアクセスを提供する必要もあります。 Azure Data Lake Storage Gen2Storage Azure Blobコンテナーへのアクセスを構成する方法については、「 Storage と Blob Storage への接続 」を参照してください。

注記

Databricks では、接続文字列の提供に Manage secrets を使用することを強くお勧めします。

構成

オプション

タイプ

デフォルト

説明

allowOverwrites(上書きを許可する)

ブール値

true

上書きされた BLOB を再処理するかどうか。

コネクションストリング

文字列

なし(必須パラメータ)

キューにアクセスするための 接続文字列

fetchParallelism (フェッチ並列処理)

整数タイプ

1

キューイングサービスからメッセージを取得するときに使用するスレッドの数。

ファイル形式

文字列

なし(必須パラメータ)

parquetjsoncsvtextなどのファイルの形式。

ignoreFileDeletion (無視ファイルの削除)

ブール値

false

ライフサイクル設定がある場合、またはソースファイルを手動で削除する場合は、このオプションを trueに設定する必要があります。

maxFileAge (マックス ファイル年齢)

整数タイプ

604800

重複処理を防ぐために、ファイル通知を状態として保存する期間 (秒単位) を決定します。

pathRewritesの

JSON 文字列。

"{}"

マウントポイントを使用する場合は、 container@storageAccount/key パスのプレフィックスをマウントポイントで書き換えることができます。 書き換えることができるのはプレフィックスだけです。 たとえば、設定 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"}の場合、パス wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json は次のように書き換えられます dbfs:/mnt/data-warehouse/2017/08/fileA.json.

キューフェッチインターバル

期間を表す文字列。例:2 分の場合は 2m

"5s"

キューが空の場合にフェッチ間で待機する時間。 Azure は AQS への API 要求ごとに課金されます。 したがって、データが頻繁に到着しない場合は、この値を長い期間に設定できます。 キューが空でない限り、継続的にフェッチします。 新しいファイルが 5 分ごとに作成される場合は、AQS コストを削減するために高い queueFetchInterval を設定することをお勧めします。

キュー名

文字列

なし(必須パラメータ)

AQS キューの名前。

ドライバー ログに Fetched 0 new events and 3 old events.のようなメッセージがたくさんあり、新しいイベントよりも古いイベントが多く観察される傾向がある場合は、ストリームのトリガー間隔を短くする必要があります。

一部のファイルが処理される前に削除される可能性があると予想される Blob Storage 上の場所からファイルを使用している場合は、エラーを無視して処理を続行するように次の構成を設定できます。

Python
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

よくある質問(FAQ)

ignoreFileDeletion False (デフォルト) でオブジェクトが削除された場合、パイプライン全体は失敗しますか?

はい、ファイルが削除されたことを示すイベントを受け取った場合、パイプライン全体が失敗します。

maxFileAgeはどのように設定すればよいですか?

Azure Queue Storage は、少なくとも 1 回のメッセージ配信セマンティクスを提供するため、重複除去のために状態を保持する必要があります。 maxFileAge のデフォルト設定は 7 日で、これはキュー内のメッセージの最大 TTL と同じです。

この記事は役に立ちましたか?