構造化ストリーミングのトリガー間隔を構成する
この記事では、Databricks 上の構造化ストリーミングのトリガー間隔を構成する方法について説明します。
Apache Spark構造化ストリーミングはデータを段階的に処理します。 トリガー間隔は、構造化ストリーミングが新しいデータをチェックする頻度を制御します。ほぼリアルタイムの処理、スケジュールされたデータベースの更新、または 1 日または 1 週間のすべての新しいデータのバッチ処理のトリガー間隔を構成できます。
Auto Loaderとは何ですか? は構造化ストリーミングを使用してデータをロードします。トリガーの仕組みを理解することで、必要な頻度でデータを取り込みながらコストを柔軟に制御できるようになります。
トリガーモードの概要
次の表は、構造化ストリーミングで使用できるトリガー モードをまとめたものです。
トリガーMode | 構文例(Python) | どのようなタスクにベストなのか |
|---|---|---|
未指定(デフォルト) | N/A | 3〜5秒の遅延を伴う汎用ストリーミング。0 ミリ秒間隔の processingTime トリガーと同等です。新しいデータが到着する限り、ストリーム処理は継続的に実行されます。 |
処理時間 |
| コストとパフォーマンスのバランス。システムがデータを頻繁にチェックしないようにすることでオーバーヘッドを削減します。 |
今すぐ入手可能 |
| スケジュールされた増分バッチ処理。ストリーミング ジョブがトリガーされた時点で利用可能な量のデータを処理します。 |
リアルタイムモード |
| 不正行為の検出やリアルタイムのパーソナライゼーションなど、1 秒未満の処理を必要とする超低レイテンシの運用ワークロード。パブリックプレビュー。「5 分」はマイクロバッチの長さを示します。クエリのコンパイルなどのバッチごとのオーバーヘッドを最小限に抑えるには、5 分を使用します。 |
連続 |
| サポートされていません。これは Spark OSS に含まれる実験的な機能です。代わりにリアルタイム モードを使用してください。 |
processingTime: 時間ベースのトリガー間隔
構造化ストリーミングでは、時間ベースのトリガー間隔を「固定間隔マイクロバッチ」と呼びます。processingTimeキーワードを使用して、時間の長さを.trigger(processingTime='10 seconds')などの文字列として指定します。
この間隔の構成により、システムが新しいデータが到着したかどうかを確認するチェックを実行する頻度が決まります。レイテンシ要件とデータがソースに到着する速度のバランスをとるために処理時間を構成します。
AvailableNow : 増分バッチ処理
Databricks Runtime 11.3 LTS 以降では、 Trigger.Onceは非推奨です。すべての増分バッチ処理ワークロードにTrigger.AvailableNow使用します。
AvailableNowトリガー オプションは、利用可能なすべてのレコードを増分バッチとして消費し、 maxBytesPerTriggerなどのオプションを使用してバッチ サイズを構成できます。サイズ設定オプションはデータ ソースによって異なります。
サポートされているデータソース
Databricks では、多くの構造化ストリーミング ソースからの増分バッチ処理に Trigger.AvailableNow を使用できます。 次の表に、各データソースに必要な最小サポート対象 Databricks Runtime バージョンを示します。
ソース | Databricks Runtime の最小バージョン |
|---|---|
ファイルソース (JSON、Parquet など) | 9.1 LTS |
Delta Lake | 10.4 LTS |
Auto Loader | 10.4 LTS |
Apache Kafka | 10.4 LTS |
Kinesis | 13.1 |
realTime: 超低レイテンシの運用ワークロード
プレビュー
この機能は パブリック プレビュー段階です。
構造化ストリーミングのリアルタイム モードでは、エンドツーエンドのレイテンシが末端で 1 秒未満、一般的なケースでは約 300 ミリ秒になります。卓球モードを効果的に設定して使用する方法の詳細については、構造化ストリーミングの「卓球モード」を参照してください。
Apache Spark には、継続的処理と呼ばれる追加のトリガー間隔があります。このモードは、Spark 2.3 以降、実験的なものとして分類されています。Databricks はこのモードをサポートも推奨もしていません。低レイテンシのユースケースでは、代わりにリアルタイム モードを使用します。
このページの連続処理モードは、 LakeFlow Spark宣言型パイプラインの連続処理とは無関係です。
実行間のトリガー間隔を変更する
同じチェックポイントを使用しながら、実行間のトリガー間隔を変更できます。
間隔を変更するときの動作
マイクロバッチの処理中に構造化ストリーミング ジョブが停止した場合、新しいトリガー間隔が適用される前にそのマイクロバッチが完了する必要があります。その結果、トリガー間隔を変更した後、以前に指定した設定でマイクロバッチ処理が実行される場合があります。移行時に予想される動作は次のとおりです。
-
時間ベースの間隔から
AvailableNowに移行しています: 利用可能なすべてのレコードを増分バッチとして処理する前に、マイクロバッチが処理される可能性があります。 -
AvailableNowから時間ベースの間隔に移行しています: 最後のAvailableNowジョブがトリガーされたときに使用可能であったすべてのレコードの処理が続行される可能性があります。これは予想される動作です。
クエリ失敗からの回復
増分バッチに関連するクエリの失敗から回復しようとしている場合、バッチを完了する必要があるため、トリガー間隔を変更してもこの問題は解決されません。バッチの処理に使用されるコンピュート容量をスケールアップして、問題の解決を試みます。 まれに、新しいチェックポイントでストリームを再開する必要がある場合があります。