新しいファイルの到着時にジョブをトリガーする
ファイル到着トリガー を使用すると、新しいファイルがAmazon S3、 Azureストレージ、Google Cloud Storageなどの外部ロケーションに到着したときに、ジョブの実行をトリガーできます。 この機能は、スケジュールされたジョブの効率が不規則な新しいデータ到着によって損なわれる場合に便利です。
ファイル到着トリガーの仕組み
ファイル到着トリガーは、新しいファイルを毎分チェックするために最善を尽くしますが、これは基になるクラウド ストレージのパフォーマンスによって影響を受ける可能性があります。 ファイル到着トリガーでは、ストレージの場所にあるファイルのリストに関連するクラウド プロバイダーのコスト以外の追加コストは発生しません。
ファイル到着トリガーは、 Unity Catalog 外部ロケーションまたはボリュームのルート、または外部ロケーションまたはボリュームのサブパスを監視するように構成できます。 たとえば、 Unity Catalog ボリューム /Volumes/mycatalog/myschema/myvolume/の場合、ファイル到着トリガーの有効なパスは次のとおりです。
/Volumes/mycatalog/myschema/myvolume/
/Volumes/mycatalog/myschema/myvolume/mydirectory/
ファイル到着トリガーは、構成された場所のすべてのサブディレクトリで新しいファイルを再帰的にチェックします。たとえば、ロケーション /Volumes/mycatalog/myschema/myvolume/mydirectory/ のファイル到着トリガーを作成し、このロケーションに次のサブディレクトリがあるとします。
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirA
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirB
/Volumes/mycatalog/myschema/myvolume/mydirectory/subdirC/subdirD
トリガーは、 mydirectory、 subdirA、 subdirB、 subdirC、 subdirC/subdirDの新しいファイルをチェックします。
ファイル到着トリガーとファイル・イベント
最適なパフォーマンスを得るには、 ファイルイベントに対して外部ロケーションを有効にする必要があります。 外部ロケーションでファイルイベントが有効になっている場合、 Databricks は内部サービスを使用して、クラウドプロバイダーからの変更通知を処理することで取り込みメタデータを追跡します。 このサービスは、サービスが決定したローリング保持期間にわたって作成または更新された最新のファイルのメタデータを保持し、ファイル処理の効率を向上させます。
外部ロケーションでファイル・イベントを有効にしてから数分以内に、その外部ロケーションでカバーされるパスを監視する既存のファイル到着トリガーは、ファイル・イベントの有効化によるメリットを得始め、新しいトリガーは数秒以内にメリットを得ます。
外部ロケーションでのファイル イベントのパフォーマンスと容量の利点に関する詳細については、「制限事項」を参照してください。ファイルイベントに関するよくある質問については、「ファイルイベント FAQ」を参照してください。
始める前に
ファイル到着トリガーを使用するには、次のものが必要です。
-
ワークスペースで Unity Catalog が有効になっている必要があります。
-
ストレージの場所は、 Unity Catalogで構成されたボリュームまたは外部ロケーションのいずれかである必要があります。 「Unity Catalogボリュームとは」および「外部ロケーションの概要」を参照してください。
Databricks では、管理対象ファイル・イベントの外部ロケーションを有効にすることをお勧めします。 これらの外部ロケーション上のボリュームは、デフォルトでファイルイベントをサポートします。ファイル・イベントを有効にするには、外部ロケーションの所有者であるか、外部ロケーションに対する
MANAGE権限を持っている必要があります。ファイル・イベントの利点に関する情報については、 ファイル・イベントを使用したファイル到着トリガーを参照してください。 -
ストレージの場所に対する
READ権限と、ジョブに対する CAN MANAGE 権限が必要です。ジョブのアクセス許可の詳細については、ジョブ ACLを参照してください。
ファイル到着トリガーを追加する
ファイル到着トリガーをジョブに追加するには:
- Databricks ワークスペースのサイドバーで、 ジョブとパイプライン をクリックします。
- 必要に応じて、[ ジョブ ] と [自分が所有] フィルターを選択します。
- ジョブの 名前 リンクをクリックします。
- 右側の ジョブの詳細 ペインで、 トリガーの追加 をクリックします。
- トリガーの種類 で、 ファイル到着 を選択します。
- [ ストレージの場所 ] に、監視するルートの URL または外部ロケーションのサブパス Unity Catalog または Unity Catalog ボリュームのルートまたはサブパスを入力します。
- (オプション) 詳細オプション ( トリガー間の最小時間(秒) と 最後の変更後に待機 (秒) ) を構成して、実行のトリガー頻度を制御します。設定例については、実行のトリガー頻度を制御するを参照してください。
- 構成を検証するには、[ 接続のテスト ] をクリックします。
- [ 保存 ]をクリックします。
後でこのトリガーを編集、停止する、または削除するには、 「ジョブの詳細」 ペインの 「スケジュールとトリガー」 セクションを使用します。「既存のトリガーを管理する」を参照してください。
実行がトリガーされる頻度を制御します。
ファイル到着トリガーの2つの詳細オプションにより、ファイルの到着がジョブ実行にどのように変換されるかが制御されます。これらのオプションは、 クールダウン と デバウンス という2つの一般的なレート制御パターンを適用します:
- トリガー間の最小時間(秒) : ジョブをこの間隔あたり最大1回の実行に制限します (実行間のクールダウン)。実行が完了した後、クールダウン中に到着したファイルは、間隔が経過するまで新しい実行を開始しません。このオプションを使用して、実行が作成される頻度を制限し、頻繁な到着によって連続した実行が作成されないようにします。
- 最後の変更後の待ち時間(秒) :最新のファイルが到着してから、実行を開始するまでこの長さ待機します。新しいファイルが到着するたびにタイマーがリセットされます(デバウンス)。このオプションは、ファイルがバッチで到着し、すべてのファイルが着地した後に、バッチ全体を1回の実行で処理したい場合に使用します。
いずれかのオプションを単独で設定することも、両方を一緒に設定することもできます。次の例を参照してください。
最大でも15分ごとに実行
ファイルが到着するたびに実行を作成するには、ただし15分ごとに1回を超える頻度ではないように、以下の詳細オプションを設定します:
- トリガー間の最小時間(秒) :
900
各実行が完了した後、ファイルが到着し続けても、トリガーは別の実行を開始する前に900秒(15分)待機します。これにより、実行の作成は15分あたり最大1回に制限されます。
完全なバッチが到着するまで待機します
ファイルがバッチで到着し、各バッチを単一の実行で処理したい場合は、 最後の変更後に待機 (秒) を、バッチ間の間隔よりも短く、かつバッチ内のファイル間の間隔よりも長い値に設定してください。例えば、新しいバッチが約5分ごとに開始される場合、以下の詳細オプションを設定してください。
- 最後の変更後の待ち時間(秒) :
60
新しいファイルが到着するたびにタイマーがリセットされるため、新しい到着がないまま60秒が経過した後にのみ、トリガーが実行を開始します。この設定は、バッチ内のファイルが互いに60秒以内に到着するため、バッチの途中でタイマーが期限切れになることはなく、また、バッチ間隔が60秒以上であるため、連続するバッチが単一の実行にマージされることはない、と想定しています。
頻度を制限し、完了したバッチを待機します。
実行が作成される頻度を制限し、バッチの途中で実行を開始するのを避ける場合は、両方のオプションを組み合わせることができます。例えば:
- トリガー間の最小時間(秒) :
900 - 最後の変更後の待ち時間(秒) :
60
この構成では、トリガーはバッチの着地が完了するまで (新しいファイルがない状態で60秒間) 待機してから実行を開始し、15分あたり1回を超える実行は開始しません。
到着時にファイルを検出して処理する
ファイル到着トリガーをトリガーしたファイルを処理するには、 Auto Loader を使用できます。Auto Loader は、正確に 1 回だけ実行することを保証しながら、新しいファイルを段階的に効率的に処理します。たとえば、以下のスニペットを使用してファイルを Delta テーブルに読み込みます。
このソリューションを使用するには、ファイル到着トリガーを使用してジョブを作成し、以下のコードを含むノートブックを追加します。各[REPLACE]プレースホルダーを適切な値に置き換えます。
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.
sink_table = "[REPLACE]" # Delta table to write to
# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", checkpoint_location) \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.writeStream \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.toTable(sink_table)
新しいファイルをカスタム ロジックで処理する必要があり、新しいファイルの URL のみを検出する場合は、以下のコード スニペットに示すように、代わりにforeachBatch使用できます。foreachBatch少なくとも 1 回の処理保証のみを提供することに注意してください。foreachBatch使用に関する詳細については、 foreachBatch を使用して任意のデータ シンクに書き込むを参照してください。
# Configuration
file_location = "[REPLACE]" # The same URL configured for the file arrival trigger.
checkpoint_location = "[REPLACE]" # a separate URL (outside `file_location`) used to store the Auto Loader checkpoint, which enables exactly-once processing.
def process_batch(batch_df, batch_id):
file_url = batch_df.select("path").collect()[0].path
# [REPLACE] Your custom function for processing newly arrived files
# Use Auto Loader to discover new files.
# Do not modify code below this line
streamingQuery = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("cloudFiles.useManagedFileEvents","true") \
.load(file_location) \
.drop("content") \
.writeStream \
.foreachBatch(process_batch) \
.option("checkpointLocation", checkpoint_location) \
.trigger(availableNow = True) \
.start()
失敗したファイル到着トリガーの通知を受け取る
ファイル到着トリガーの評価に失敗した場合に通知を受け取るには、ジョブ失敗時の Eメール 通知またはシステム宛先通知を設定します。 ジョブに通知を追加するを参照してください。
制限
-
新しいファイルのみが実行をトリガーします。 既存のファイルを同じ名前のファイルで上書きしても、実行はトリガーされません。
-
ファイル到着トリガーに使用するパスには、外部テーブルや、カタログとスキーマの管理された場所を含めることはできません。
-
ファイル到着トリガーに使用されるパスには、
*や?などのワイルドカードを含めることはできません。 -
ストレージの場所が Unity Catalog の外部ロケーションとして構成され、その外部ロケーションが ファイル イベントに対して有効になっている場合:
-
保存場所のファイル数に制限はありません。
-
不要なファイルの更新が多すぎると、タイムアウト時にトリガーでエラーが発生する可能性があります。
ファイル到着トリガーがUnity Catalogの外部ロケーションまたはボリュームのサブパスに設定されている場合、外部ロケーションのルートなど、そのサブパスの外側での変更により、トリガーが処理する必要があるメタデータが増加する可能性があります。 変更の多い環境では、これによりトリガーの処理時間制限を超え、エラー状態になる可能性があります。
これを防ぐには、監視するサブディレクトリに具体的にマップするUnity Catalogボリュームを作成し、そのボリュームのルートにファイル到着トリガーを設定します。 このアプローチでは、ターゲット パスがトリガーの有効なルートとして分離され、関連のないルート レベルの変更が削減され、トリガーがエラー状態になるのを防ぎます。
-
既存のファイルが変更され、そのメタデータがローリング保持期間外である場合、その変更は新しいファイルの到着として扱われ、ジョブの実行をトリガーします。これを防ぐには、不変のファイルのみを取り込むか、Auto Loaderでファイル到着トリガーを使用して取り込みの進行状況を追跡できます。
-
-
ファイル・イベントに対してストレージ・ロケーションが有効になっていない場合:
- 最大 50 個のジョブを、Databricks ワークスペース内のそのような場所でファイル到着トリガーを使用して構成できます。
- 保存場所には、最大 10,000 個のファイルを格納できます。設定されたストレージの場所が Unity Catalog 外部ロケーションまたはボリュームのサブパスである場合、10,000 ファイルの制限は、ストレージの場所のルートではなくサブパスに適用されます。 たとえば、ストレージの場所のルートには、そのサブディレクトリ全体で 10,000 を超えるファイルを含めることができますが、構成されたサブディレクトリは 10,000 ファイルの制限を超えてはなりません。
ファイルイベントの制限も参照してください。
S3 および GCS の外部ロケーションに存在しないパスでのファイル到着トリガー
設定されたディレクトリが存在しないか、Amazon S3 または Google Cloud Storage から削除された場合でも、ファイル到着トリガーはエラーなしで評価され続けます。この動作は、S3 と GCS の両方が存在しないディレクトリ、削除されたディレクトリ、空のディレクトリを区別しないために発生します。
その結果、存在しないまたは削除されたディレクトリ パスに対するファイル到着トリガー モニタリング は失敗せず、エラー通知も生成されません。 トリガーは評価を継続しますが、ファイルが見つからず、そのパスにファイルが再度追加されるまでジョブの実行をトリガーしません。これは予期される動作であり、エラー状態ではありません。