メインコンテンツまでスキップ

構造化ストリーミングのトリガー間隔を構成する

Apache Spark構造化ストリーミングはデータを段階的に処理します。 トリガー間隔は、構造化ストリーミングが新しいデータをチェックする頻度を制御します。ほぼリアルタイムの処理、スケジュールされたデータベースの更新、または 1 日または 1 週間のすべての新しいデータのバッチ処理のトリガー間隔を構成できます。

Auto Loaderとは何ですか? は構造化ストリーミングを使用してデータをロードします。トリガーの仕組みを理解することで、必要な頻度でデータを取り込みながらコストを柔軟に制御できるようになります。

重要

Databricksは、使用状況に応じてレイテンシとコストのバランスが取れるトリガーモードを設定することを推奨します。そうしないと、クラウドプロバイダーから予期せぬストレージ料金を請求される可能性があります。詳細については、 「クラウドストレージ料金の管理」をご覧ください。

トリガーモードの概要

次の表は、構造化ストリーミングで使用できるトリガー モードをまとめたものです。

トリガーMode

構文例(Python)

どのようなタスクにベストなのか

未指定(デフォルト)

N/A

3〜5秒の遅延を伴う汎用ストリーミング。0 ミリ秒間隔の processingTime トリガーと同等です。新しいデータが到着する限り、ストリーム処理は継続的に実行されます。

処理時間

.trigger(processingTime='10 seconds')

コストとパフォーマンスのバランス。システムがデータを頻繁にチェックしないようにすることでオーバーヘッドを削減します。

今すぐ入手可能

.trigger(availableNow=True)

スケジュールされた増分バッチ処理。ストリーミング ジョブがトリガーされた時点で利用可能な量のデータを処理します。

リアルタイムモード

.trigger(realTime='5 minutes')

不正行為の検出やリアルタイムのパーソナライゼーションなど、1 秒未満の処理を必要とする超低レイテンシの運用ワークロード。パブリックプレビュー。「5 分」はマイクロバッチの長さを示します。クエリのコンパイルなどのバッチごとのオーバーヘッドを最小限に抑えるには、5 分を使用します。

連続

.trigger(continuous='1 second')

サポートされていません。これは Spark OSS に含まれる実験的な機能です。代わりにリアルタイム モードを使用してください。

:::note サーバレスコンピュート

サーバレスコンピュートでは、 Trigger.AvailableNow()Trigger.Once()のみがサポートされます。 DatabricksはTrigger.AvailableNow()推奨しています。

サーバーレス コンピュートでの連続ストリーミングの場合は、連続モードでトリガー モードと連続パイプライン モードを使用します。

ストリーミングの制限事項をご覧ください。

:::

processingTime: 時間ベースのトリガー間隔

構造化ストリーミングでは、時間ベースのトリガー間隔を「固定間隔マイクロバッチ」と呼びます。processingTimeキーワードを使用して、時間の長さを.trigger(processingTime='10 seconds')などの文字列として指定します。

この間隔の構成により、システムが新しいデータが到着したかどうかを確認するチェックを実行する頻度が決まります。レイテンシ要件とデータがソースに到着する速度のバランスをとるために処理時間を構成します。

AvailableNow : 増分バッチ処理

重要

Databricks Runtime 11.3 LTS以降では、 Trigger.Onceは非推奨です。すべての増分バッチ処理ワークロードにはTrigger.AvailableNow使用してください。

AvailableNowトリガー オプションは、利用可能なすべてのレコードを増分バッチとして消費し、 maxBytesPerTriggerなどのオプションを使用してバッチ サイズを構成できます。サイズ設定オプションはデータ ソースによって異なります。

サポートされているデータソース

Databricks では、多くの構造化ストリーミング ソースからの増分バッチ処理に Trigger.AvailableNow を使用できます。 次の表に、各データソースに必要な最小サポート対象 Databricks Runtime バージョンを示します。

ソース

Databricks Runtime の最小バージョン

ファイルソース (JSON、Parquet など)

9.1 LTS

Delta Lake

10.4 LTS

Auto Loader

10.4 LTS

Apache Kafka

10.4 LTS

Kinesis

13.1

realTime: 超低レイテンシの運用ワークロード

構造化ストリーミングのリアルタイム モードでは、エンドツーエンドのレイテンシが末端で 1 秒未満、一般的なケースでは約 300 ミリ秒になります。卓球モードを効果的に設定して使用する方法の詳細については、構造化ストリーミングの「卓球モード」を参照してください。

Apache Spark には、継続的処理と呼ばれる追加のトリガー間隔があります。このモードは、Spark 2.3 以降、実験的なものとして分類されています。Databricks はこのモードをサポートも推奨もしていません。低レイテンシのユースケースでは、代わりにリアルタイム モードを使用します。

注記

このページの連続処理モードは、 LakeFlow Spark宣言型パイプラインの連続処理とは無関係です。

クラウドストレージのコストを管理する

デフォルトでは、トリガーモードを設定しない場合、構造化ストリーミングはトリガーモードをprocessingTime 、間隔を0に設定し、数ミリ秒ごとに新しいデータをチェックします。これにより、1日あたり大量のクラウドストレージAPI呼び出しが発生し、クラウドプロバイダーから予期せぬ料金が発生する場合があります。

Databricksは、レイテンシとコストの要件に適したトリガーモードを設定することを推奨します。時間ベースのトリガー間隔の構成については、 processingTimeを参照してください。

実行間のトリガー間隔を変更する

同じチェックポイントを使用しながら、実行間のトリガー間隔を変更できます。

間隔を変更するときの動作

構造化ストリーミングクエリがマイクロバッチの処理中に停止した場合、新しいトリガー間隔が適用される前に、そのマイクロバッチの処理が完了している必要があります。トリガー間隔を変更した後、マイクロバッチ処理が以前に指定した構成で実行されることが確認される場合があります。遷移後の想定される動作は以下のとおりです。

  • 時間ベースの間隔からAvailableNowまで: マイクロバッチは、利用可能なすべてのレコードが処理される前に、増分バッチとして処理される可能性があります。
  • AvailableNowから時間ベースの間隔まで: 最後のAvailableNowジョブがトリガーされた時点で利用可能だったすべてのレコードに対して処理が継続される可能性があります。

クエリの失敗から復旧する

クエリの失敗から回復するために増分バッチを使用しようとする場合、トリガー間隔を変更しても問題は解決しません。構造化ストリーミングには冪等のマイクロバッチが必要であるため、以前の失敗したバッチは完了する必要があります。 Apache Spark のフォールトトレランスに関するセマンティクスを参照してください。

障害を解決するには、ワーカー ノードのサイズを増やすなど、コンピュートの容量をスケールアップします。 まれに、新しいチェックポイントを使用してストリームを再開する必要がある場合があります。