構造化ストリーミングの出力モードを選択します

この記事では、ステートフル ストリーミングの出力モードの選択について説明します。 集約を含むステートフルストリームのみ、出力モード設定が必要です。

結合は追加出力モードのみをサポートし、出力モードは重複排除に影響を与えません。 任意のステートフル演算子 mapGroupsWithStateflatMapGroupsWithState は、独自のカスタム ロジックを使用してレコードを出力するため、ストリームの出力モードは動作に影響を与えません。

ステートレス ストリーミングの場合、すべての出力モードは同じように動作します。

出力モードを正しく構成するには、ステートフル ストリーミング、ウォーターマーク、トリガーを理解する必要があります。 次の記事を参照してください。

出力モードとは何ですか?

構造化ストリーミング クエリの出力モードによって、各トリガー中にクエリの演算子が出力するレコードが決まります。 出力できるレコードには、次の 3 つのタイプがあります。

  • 将来の処理が変更されないことを記録します。

  • 最後のトリガー以降に変更されたレコード。

  • 状態テーブル内のすべてのレコード。

ステートフル・オペレーターによって生成される特定の行はトリガーごとに変更される可能性があるため、出力するレコードのタイプを知ることは、ステートフル・オペレーターにとって重要です。 たとえば、ストリーミング集計演算子が特定のウィンドウに対してより多くの行を受け取ると、そのウィンドウの集計値がトリガー間で変更される可能性があります。

ステートレス演算子の場合、レコードの種類の区別は演算子の動作に影響しません。 ステートレス オペレーターがトリガー中に発行するレコードは、常にそのトリガー中に処理されるソース レコードです。

使用可能な出力モード

特定のトリガー時にどのレコードを出力するかをオペレーターに指示する出力モードは 3 つあります。

出力Mode

説明

追加モード(デフォルト)

デフォルトでは、ストリーミング クエリは追加モードで実行されます。 このモードでは、演算子は将来のトリガーで変更されない行のみを出力します。 ステートフル演算子は、ウォーターマークを使用して、これがいつ発生するかを判断します。

更新モード

更新モードでは、オペレーターは、出力されたレコードが後続のトリガーで変更される可能性がある場合でも、トリガー中に変更されたすべての行を出力します。

コンプリートモード

完全モードはストリーミング集約でのみ機能します。 完全モードでは、演算子によって生成されたすべての結果行がダウンストリームに出力されます。

本番運用の考慮事項

多くのステートフル ストリーミング操作では、追加モードと更新モードのどちらかを選択する必要があります。 次のセクションでは、決定に役立つ可能性のある考慮事項の概要を説明します。

注:

完全モードにはいくつかのアプリケーションがありますが、データの規模が大きくなるとパフォーマンスが低下する可能性があります。 Databricks では、多くのステートフル操作の増分処理を伴う完全モードに関連付けられたセマンティック保証を得るために、マテリアライズド ビューを使用することを推奨しています。 「Databricks SQL でマテリアライズド ビューを使用する」を参照してください。

アプリケーションのセマンティクス

アプリケーション セマンティクスは、ダウンストリーム アプリケーションがストリーミング データをどのように使用するかを説明します。

ダウンストリーム サービスがダウンストリーム書き込みごとに 1 つのアクションを実行する必要がある場合は、ほとんどの場合、追加モードを使用します。 たとえば、シンクに書き込まれる新しいレコードごとに通知を送信するダウンストリーム通知サービスがある場合、追加モードでは各レコードが 1 回だけ書き込まれることが保証されます。 更新モードでは、状態情報が変更されるたびにレコードが書き込まれるため、多数の更新が発生します。

下流のサービスに最新の結果が必要な場合、更新モードによりシンクが可能な限り最新の状態に保たれます。 例としては、リアルタイム の特徴を読み取る 機械学習 モデルや、リアルタイム の集計を追跡する アナリティクス ダッシュボードなどが挙げられます。

オペレーターとシンクの互換性

構造化ストリーミングは、Apache Spark で利用可能なすべての操作をサポートしているわけではなく、一部のストリーミング操作はすべての出力モードでサポートされていません。 オペレータの制限の詳細については、 OSS ストリーミング ドキュメントを参照してください。

すべてのシンクがすべての出力モードをサポートしているわけではありません。 すべての Unity Catalog マネージドテーブルをサポートする Delta Lake と Kafka はどちらも、すべての出力モードをサポートしています。 シンクの互換性の詳細については、 OSS ストリーミング ドキュメントを参照してください。

レイテンシーとコスト

出力モードは、レコードを書き込む前に経過する必要がある時間に影響し、書き込まれるデータの頻度と量は、ストリーミング パイプラインに関連するコストに影響を与える可能性があります。

追加モードでは、ステートフルな演算子は、ステートフルな結果がファイナライズされた後 (少なくともウォーターマークの遅延と同じ長さ) にのみ結果を出力するように強制されます。 追加出力モードでのウォーターマーク遅延が 1 hour の場合、レコードがダウンストリームに出力されるまでに少なくとも 1 時間の遅延があることを意味します。

更新モードでは、集計値ごとにトリガーごとに 1 回の書き込みが行われます。 シンクがレコードごとの書き込みごとに課金する場合、ウォーターマークの遅延が経過する前にレコードが何度も更新されると、コストが高くなる可能性があります。

設定例

次のコード例は、Unity Catalog テーブルへのストリーミング更新の出力モードの構成を示しています。

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)
// Append output mode (default)
df.writeStream
  .toTable("target_table")


// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

PySpark DataStreamWriter.outputModeまたはScala DataStreamWriter.outputModeの OSS ドキュメントを参照してください。

ステートフルストリーミングと出力モードの例

次の例は、出力モードがステートフル ストリーミングのウォーターマークとどのように相互作用するかを理解するのに役立ちます。

ウォーターマーク遅延が 15 分である店舗で 1 時間ごとに生成される総収益を計算するストリーミング集計について考えてみましょう。 最初のマイクロバッチは、次のレコードを処理します。

  • 午後2時40分に15ドル

  • 午後2時30分に10ドル

  • 午後3時10分に$ 30

この時点で、エンジンのウォーターマークは、最大時間 (午後 3 時 10 分) から 15 分 (遅延) を差し引くため、午後 2 時 55 分です。 ストリーミング集約演算子の状態は次のとおりです。

  • [2pm, 3pm]:25ドル

  • [3pm, 4pm]:$ 30

次の表は、各出力モードで何が起こるかをまとめたものです。

出力モード

結果と理由

追加

ストリーミング集約演算子は、ダウンストリームに何も出力しません。 これは、後続のトリガーで新しい値が表示されると、これらのウィンドウの両方が変更される可能性があるためです: 午後 2 時 55 分のウォーターマークは、午後 2 時 55 分以降のレコードが引き続き到着する可能性があり、それらのレコードが [2pm, 3pm] ウィンドウまたは [3pm, 4pm] ウィンドウのいずれかに分類される可能性があることを示します。

更新

オペレーターは、両方のレコードが更新を受信したため、両方のレコードを出力します。

完了

オペレーターはすべてのレコードを出力します。

ここで、ストリームがさらに 1 つのレコードを受信したとします。

  • 午後3時20分に$ 20

エンジンが午後 3 時 20 分から 15 分を差し引くため、ウォーターマークは午後 3 時 5 分に更新されます。 この時点で、ストリーミング集約演算子の状態は次のようになります。

  • [2pm, 3pm]:25ドル

  • [3pm, 4pm]:$ 50

次の表は、各出力モードで何が起こるかをまとめたものです。

出力モード

結果と理由

追加

ストリーミング集約演算子は、午後 3:05 のウォーターマークが[2pm, 3pm]ウィンドウの終了時刻より大きいことを検出します。 透かしの定義により、そのウィンドウは変更できなくなるため、 [2pm, 3pm] ウィンドウが出力されます。

更新

状態値が $30 から $50 に変更されたため、ストリーミング集計演算子は[3pm, 4pm]ウィンドウを発行します。

完了

オペレーターはすべてのレコードを出力します。

次に、ステートフル演算子が各追加モードでどのように動作するかをまとめます。

  • 追加モードでは、ウォーターマークの遅延後にレコードを 1 回書き込みます。

  • 更新モードでは、直前のトリガー以降に変更されたレコードを書き込みます。

  • 完全モードでは、ステートフル演算子によって生成されたすべてのレコードを書き込みます。