メインコンテンツまでスキップ

構造化ストリーミングの出力モードを選択する

この記事では、ステートフル ストリーミングの出力モードの選択について説明します。 集約を含むステートフルストリームのみ、出力モード設定が必要です。

結合は追加出力モードのみをサポートし、出力モードは重複排除に影響を与えません。 任意のステートフル演算子 mapGroupsWithStateflatMapGroupsWithState は、独自のカスタム ロジックを使用してレコードを出力するため、ストリームの出力モードは動作に影響を与えません。

ステートレス ストリーミングの場合、すべての出力モードは同じように動作します。

出力モードを正しく構成するには、ステートフル ストリーミング、ウォーターマーク、トリガーについて理解する必要があります。 次の記事を参照してください。

出力モードとは何ですか?

構造化ストリーミング クエリの出力モードは、クエリの演算子が各トリガー中に出力するレコードを決定します。 出力できるレコードには、次の 3 つのタイプがあります。

  • 今後の処理が変更されないことを記録します。
  • 前回のトリガー以降に変更されたレコード。
  • state テーブル内のすべてのレコード。

ステートフルな演算子によって生成される特定の行はトリガーごとに変化する可能性があるため、ステートフルな演算子にとって、出力するレコードのタイプを知ることは重要です。 たとえば、ストリーミング集計演算子が特定のウィンドウでより多くの行を受け取ると、そのウィンドウの集計値がトリガー間で変更される可能性があります。

ステートレス演算子の場合、レコードの種類の違いは演算子の動作に影響しません。 ステートレス演算子がトリガー中に出力するレコードは、常にそのトリガー中に処理されたソースレコードです。

使用可能な出力モード

特定のトリガー中に出力するレコードをオペレーターに指示する3つの出力モードがあります。

出力 Mode

説明

追加モード (デフォルト)

デフォルトでは、ストリーミングクエリは追加モードで実行されます。 このモードでは、演算子は将来のトリガーで変更されない行のみを出力します。 ステートフル演算子は、ウォーターマークを使用して、これがいつ発生するかを判断します。

更新モード

更新モードでは、オペレーターは、出力されたレコードが後続のトリガーで変更される可能性がある場合でも、トリガー中に変更されたすべての行を出力します。

コンプリートモード

コンプリート モードは、ストリーミング集計でのみ機能します。 complete モードでは、演算子によって生成されたすべての結果の行がダウンストリームに出力されます。

本番運用に関する考慮事項

多くのステートフル ストリーミング操作では、追加モードと更新モードのどちらかを選択する必要があります。 次のセクションでは、決定に役立つ可能性のある考慮事項について説明します。

アプリケーションセマンティクス

アプリケーション セマンティクスは、ダウンストリーム アプリケーションがストリーミング データをどのように使用するかを記述します。

ダウンストリーム サービスがダウンストリーム書き込みごとに 1 つのアクションを実行する必要がある場合は、ほとんどの場合、追加モードを使用します。 たとえば、シンクに書き込まれるすべての新しいレコードに対して通知を送信するダウンストリーム通知サービスがある場合、追加モードでは、各レコードが一度だけ書き込まれます。 更新モードでは、状態情報が変更されるたびにレコードが書き込まれるため、多数の更新が発生します。

ダウンストリーム サービスで新しい結果が必要な場合は、更新モードにより、シンクが可能な限り最新の状態に保たれます。 例としては、リアルタイムの特徴を読み取る機械学習モデルや、リアルタイム集計を追跡するアナリティクス ダッシュボードなどがあります。

オペレーターとシンクの互換性

構造化ストリーミングは、Apache Spark で使用可能なすべての操作をサポートしているわけではなく、一部のストリーミング操作はすべての出力モードでサポートされているわけではありません。 オペレーターの制限の詳細については、 OSS ストリーミングのドキュメントを参照してください。

すべてのシンクがすべての出力モードをサポートしているわけではありません。 すべての Unity Catalog マネージドテーブルをサポートする Delta Lake と Kafka はどちらも、すべての出力モードをサポートしています。 シンクの互換性の詳細については、 OSS ストリーミング ドキュメントを参照してください。

レイテンシとコスト

出力モードは、レコードを書き込む前に経過する必要がある時間に影響を与え、書き込まれるデータの頻度と量は、ストリーミング パイプラインに関連するコストに影響を与える可能性があります。

追加モードでは、ステートフルな演算子は、ステートフルな結果が確定した後 (少なくともウォーターマークの遅延と同じ長さ) にのみ結果を出力するように強制されます。 追加出力モードでのウォーターマーク遅延が 1 hour の場合、レコードがダウンストリームに出力されるまでに少なくとも 1 時間の遅延があることを意味します。

更新モードでは、トリガーごとに集計値ごとに 1 回の書き込みが行われます。 レコードごとの書き込みごとにシンクが課金される場合、ウォーターマークの遅延が経過する前にレコードが何度も更新されると、これは高額になる可能性があります。

設定例

次のコード例は、Unity Catalog テーブルへの更新のストリーミング出力モードの構成を示しています。

Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)

# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)

# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)

PySpark DataStreamWriter.outputMode または Scala DataStreamWriter.outputMode については、OSS のドキュメントを参照してください。

ステートフル ストリーミング モードと出力モードの例

次の例は、ステートフル ストリーミングの出力モードがウォーターマークとどのように相互作用するかを推論することを目的としています。

ウォーターマークの遅延が 15 分の店舗で 1 時間ごとに生成される総収益を計算するストリーミング集計について考えてみます。 最初のマイクロバッチは、次のレコードを処理します。

  • 午後2時40分に15ドル
  • 午後2時30分に10ドル
  • 午後3時10分に30ドル

この時点で、エンジンのウォーターマークは、表示された最大時間 (午後 3 時 10 分) から 15 分 (遅延) を差し引くため、午後 2 時 55 分です。 ストリーミング集計演算子の状態は次のとおりです。

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: 30ドルです

次の表は、各出力モードで何が起こるかの概要を示しています。

出力モード

結果と理由

追加

ストリーミング集計演算子は、ダウンストリームに対して何も出力しません。 これは、これらのウィンドウの両方が、後続のトリガーで新しい値が表示されると変更される可能性があるためです。午後 2 時 55 分のウォーターマークは、午後 2 時 55 分以降のレコードがまだ到着する可能性があることを示し、それらのレコードは [2pm, 3pm] ウィンドウまたは [3pm, 4pm] ウィンドウのいずれかに分類される可能性があります。

更新

オペレーターは、両方のレコードが更新を受け取ったため、両方のレコードを出力します。

完了

オペレーターはすべてのレコードを出力します。

ここで、ストリームがもう 1 つのレコードを受け取るとします。

  • 午後3時20分に20ドル

ウォーターマークは、エンジンが午後 3 時 20 分から 15 分を差し引くため、午後 3 時 05 分に更新されます。 この時点で、ストリーミング集計演算子の状態は次のとおりです。

  • [2pm, 3pm]: $25
  • [3pm, 4pm]:50ドル

次の表は、各出力モードで何が起こるかの概要を示しています。

出力モード

結果と理由

追加

ストリーミング集計演算子は、午後 3 時 05 分のウォーターマークが [2pm, 3pm] ウィンドウの終わりより大きいことを観察します。 ウォーターマークの定義により、そのウィンドウは変更できなくなり、 [2pm, 3pm] ウィンドウが出力されます。

更新

ストリーミング集計演算子は、状態の値が $30 から $50 に変更されたため、 [3pm, 4pm] ウィンドウを出力します。

完了

オペレーターはすべてのレコードを出力します。

次に、ステートフル演算子が各追加モードでどのように動作するかをまとめます。

  • 追加モードでは、ウォーターマーク遅延の後に一度レコードを書き込みます。
  • 更新モードでは、前のトリガー以降に変更されたレコードを書き込みます。
  • complete モードでは、ステートフル オペレータによって生成されたすべてのレコードを書き込みます。