Amazon SQS を使用した Amazon S3 ソース (レガシー)
このドキュメントは廃止されており、更新されない可能性があります。 このコンテンツに記載されている製品、サービス、またはテクノロジはサポートされなくなりました。 「Auto Loaderとは」を参照してください。
S3-SQS コネクタはAmazonSimple Queue サービス (SQS) を使用して最適化されたAmazon S3 ソースを提供し、すべてのファイルを繰り返しリストすることなく、S3バケットに書き込まれた新しいファイルを検索できるようにします。これには、次の 2 つの利点があります。
- 低レイテンシー: S3 に大きなバケットをリストする必要はありません。これは、時間がかかり、リソースを大量に消費します。
- コストの削減: S3 に対してコストのかかる LIST API リクエストを行う必要はもうありません。
S3-SQS ソースは、イベントを消費するときに SQS キューからメッセージを削除します。 他のパイプラインを同じキューから消費する場合は、最適化されたリーダー用に別の SQS キューを設定します。 SNSを使用して、複数のSQSキューにメッセージを公開できます。
S3-SQS ファイル ソースを使用する
S3-SQS ファイル・ソースを使用するには、以下の操作を行う必要があります。
-
イベント通知を設定し、SQSにルーティングします。 「 Amazon S3 イベント通知の設定」を参照してください。
-
fileFormat
オプションとqueueUrl
オプション、およびスキーマを指定します。例えば:Pythonspark.readStream \
.format("s3-sqs") \
.option("fileFormat", "json") \
.option("queueUrl", ...) \
.schema(...) \
.load()
Amazon SQS と S3 による認証
Databricks は 、SQS への認証に Amazon のデフォルトの資格情報プロバイダーチェーン を使用します。Databricks クラスターは、SQS と S3 バケットにアクセスできるインスタンスプロファイルを使用して起動することをお勧めします。
このソースには、 sqs:ReceiveMessage
、 sqs:DeleteMessage
、および s3:GetObject
の権限が必要です。 Amazon: Access Denied
例外が発生した場合は、
ユーザーまたはプロファイルにこれらの権限があることを確認します。IAMAmazonSQS での Identity-Based () ポリシーの使用 およびを参照してください。詳細については、Bucket ポリシー Examples を参照してください。
構成
オプション | タイプ | デフォルト | 説明 |
---|---|---|---|
allowOverwrites(上書きを許可する) | ブール値 |
| 上書きされた BLOB を再処理するかどうか。 |
excludeRegex(正規表現を除外) | 文字列 | なし | パスに基づいてファイルを除外します。 |
fetchParallelism (フェッチ並列処理) | 整数タイプ | 1 | キューイングサービスからメッセージを取得するときに使用するスレッドの数。 |
ファイル形式 | 文字列 | なし(必須パラメータ) |
|
maxFileAge (マックス ファイル年齢) | 整数タイプ | 604800 | 重複処理を防ぐために、ファイル通知を状態として保存する期間 (秒単位) を決定します。 |
maxFilesPerTrigger | 整数タイプ | 1000 | 各トリガーで考慮される新しいファイルの最大数。 |
pathRewritesの | JSON 文字列。 |
| マウントポイントを使用する場合は、 |
キューURL | 文字列 | なし(必須パラメータ) | SQSキューのURL。 |
region | 文字列 | ローカルで解決されたリージョン | キューが定義されている地域。 |
sqsFetchInterval (英語) | 期間文字列 (例: |
| キューが空の場合にフェッチ間で待機する時間。AWS は SQS への API リクエストごとに課金します。したがって、データが頻繁に到着しない場合は、この値を長い期間に設定できます。SQS は、最大 20 秒の長時間ポーリングをサポートします。この値が 20 秒より長く設定されている場合、残りの時間スリープします。キューが空でない限り、継続的にフェッチします。 Kinesis Firehose や CloudTrail ログなど、新しいファイルが 5 分ごとに作成される場合は、SQS のコストを削減するために、 |
ドライバーログに Fetched 0 new events and 3 old events.
のようなメッセージがたくさんあり、新しいイベントよりも古いイベントが多く観察される傾向がある場合は、ストリームのトリガー間隔を短くするか、SQS キューの 可視性タイムアウト を増やすことができます。
一部のファイルが処理される前に削除される可能性がある S3 上の場所からファイルを使用している場合は、エラーを無視して処理を続行するように次の設定を設定できます。
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
よくある質問(FAQ)
SQSキューURLにはすでにリージョンエンドポイントが含まれているため、 region
フィールドを設定する必要はありませんよね?
SQS キューが Spark クラスターと同じリージョンにない場合は、リージョンを明示的に設定する必要があります。
sqsFetchInterval
- 値が 20 秒未満の場合、SQS のロング ポーリング タイムアウトをその特定の値に設定しますか? はい。
- 値が 20 秒より大きく、キューにデータがある場合、20 秒のタイムアウトで SQS ロング ポーリング要求を作成し続けますか?ロングポーリングを20秒に設定してリクエストを行いますが、SQSはすぐに戻ります。20秒も待つことはありません。
- 値が 20 秒より大きく、キューが空の場合、指定した間隔の後に 20 秒のタイムアウトを持つ長いポーリング要求を作成しますか?ロングポーリングを20秒に設定してリクエストを行います。SQSが何も返さない場合は、残りのインターバルをスリープします。SQSはREST API呼び出しごとに課金されるため、インターバル中はSQSに対してこれ以上リクエストを行いません。
ignoreFileDeletion
False (デフォルト) でオブジェクトが削除された場合、パイプライン全体は失敗しますか?
はい、ファイルが削除されたことを示すイベントを受け取った場合、パイプライン全体が失敗します。
maxFileAge
はどのように設定すればよいですか?
SQS は at-least-once メッセージ配信セマンティクスを提供するため、重複排除のために状態を保持する必要があります。 maxFileAge
のデフォルト設定は 7 日で、SQS のメッセージのデフォルトの TTL である 4 日よりも長くなります。キュー内のメッセージの保持期間を長く設定する場合は、それに応じてこの構成を設定します。