メインコンテンツまでスキップ

ウォーターマークを適用してデータ処理のしきい値を制御する

このページでは、ウォーターマークの基本概念について説明し、一般的なステートフル ストリーミング操作でウォーターマークを使用するための推奨事項を示します。 状態に保持されるデータの量が無限に拡大してメモリの問題が発生したり、長時間実行されるストリーミング操作中に処理の待ち時間が長くなったりすることを避けるために、ステートフル ストリーミング操作にウォーターマークを適用する必要があります。

ウォーターマークとは何ですか?

構造化ストリーミングでは、ウォーターマークを使用して、特定の状態エンティティの更新の処理を続行する時間のしきい値を制御します。 状態エンティティの一般的な例には、次のようなものがあります。

  • 時間枠での集計。
  • 2 つのストリーム間の結合の一意のキー。

ウォーターマークを宣言するときは、ストリーミング データフレーム のタイムスタンプ フィールドとウォーターマークのしきい値を指定します。 新しいデータが到着すると、ステートマネージャーは指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のすべてのレコードを処理します。

次の例では、ウィンドウカウントに10分のウォーターマーク閾値を適用します。

Python
from pyspark.sql.functions import window

(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)

この例では、次のようになります。

  • event_time列は、10 分間のウォーターマークと 5 分間のタンブリング ウィンドウを定義するために使用されます。
  • カウントは、重複しない 5 分間のウィンドウごとに観測された id ごとに収集されます。
  • 状態情報は、ウィンドウの終了時に最新の観測 event_timeより 10 分古いまで、カウントごとに保持されます。
重要

ウォーターマークしきい値は、指定されたしきい値内に到着したレコードが、定義されたクエリのセマンティクスに従って処理されることを保証します。指定されたしきい値を超えて到着する遅れたレコードは、クエリメトリックを使用して処理される可能性がありますが、これは保証されません。

ウォーターマークは処理時間とスループットにどのような影響を与えますか?

ウォーターマークは出力モードと対話して、データがシンクに書き込まれるタイミングを制御します。 ウォーターマークは処理する状態情報の総量を減らすため、効率的なステートフル ストリーミング スループットにはウォーターマークの効果的な使用が不可欠です。

注記

すべての出力モードがすべてのステートフル操作でサポートされているわけではありません。

ウォーターマークとウィンドウ集計の出力モード

次の表は、ウォーターマークが定義されたタイムスタンプで集計されたクエリの処理の詳細を示しています。

出力モード

挙動

追加

ウォーターマークしきい値を超えると、行はターゲット テーブルに書き込まれます。すべての書き込みは遅延しきい値に基づいて遅延されます。しきい値を超えると、古い集約状態は削除されます。

更新

結果が計算されると行がターゲット テーブルに書き込まれ、新しいデータが到着すると更新および上書きされる可能性があります。しきい値を超えると、古い集約状態は削除されます。

完了

集約状態は削除されません。ターゲット テーブルはトリガーごとに書き換えられます。

ウォーターマークとストリームストリーム結合の出力

複数のストリーム間の結合は追加モードのみをサポートし、一致したレコードは検出された各バッチに書き込まれます。 内部結合の場合、 Databricks は各ストリーミング データソースにウォーターマークのしきい値を設定することをお勧めします。 これにより、古いレコードの状態情報を破棄できます。 ウォーターマークがない場合、構造化ストリーミングは、結合の両側からすべてのキーを各トリガーで結合しようとします。

構造化ストリーミングには、外部結合をサポートするための特別なセマンティクスがあります。ウォーターマークは、キーが一致しなくなった後にキーに null 値を書き込む必要があることを示すため、外部結合では必須です。外部結合は、データ処理中に一致することのないレコードを記録するのに役立ちますが、結合は追加操作としてのみテーブルに書き込むため、この欠落データは遅延しきい値を超えるまで記録されません。

構造化ストリーミングの複数のウォーターマークポリシーによる遅延データのしきい値の制御

複数の構造化ストリーミング入力を操作する場合、複数のウォーターマークを設定して、遅延到着データの許容しきい値を制御できます。 ウォーターマークを設定すると、状態情報を制御でき、レイテンシーに影響を与えることができます。

ストリーミング クエリには、ユニオンまたは結合された複数の入力ストリームを含めることができます。 各入力ストリームには、ステートフル操作で許容する必要がある遅延データの異なるしきい値を設定できます。 これらのしきい値は、 各入力ストリームでwithWatermarks("eventTime", delay) します。 次に、 ストリーム/ストリーム結合を使用したクエリの例を示します。

Scala
val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)

クエリの実行中、構造化ストリーミングは各入力ストリームで見られる最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作に使用する単一のグローバル ウォーターマークを選択します。 デフォルトでは、グローバル ウォーターマークとして最小値が選択されます。これは、ストリームの 1 つが他のストリームより遅れた場合 (たとえば、ストリームの 1 つがアップストリーム障害のためにデータの受信を停止した場合)、手遅れになってデータが誤ってドロップされるのを防ぐためです。つまり、グローバル ウォーターマークは最も遅いストリームのペースで安全に移動し、それに応じてクエリ出力が遅延されます。

より速く結果を取得したい場合は、SQL 構成spark.sql.streaming.multipleWatermarkPolicymax (デフォルトはmin ) に設定して、複数のウォーターマーク ポリシーを設定し、最大値をグローバル ウォーターマークとして選択することができます。これにより、グローバル ウォーターマークは最速のストリームのペースで移動できるようになります。ただし、この構成では、最も遅いストリームからのデータが削除されます。Databricks では、この構成を慎重に使用することを推奨しています。

ウォーターマークを個別の操作に適用する

distinct操作は、無制限の状態増加を防ぐためにウォーターマークを必要とするステートフル オペレーターです。ウォーターマークがないと、構造化ストリーミングはすべての一意のレコードを無期限に追跡しようとするため、メモリの問題が発生したり、処理の遅延が長引いたりする可能性があります。

ストリーミング DataFrame にdistinctを適用する場合は、タイムスタンプ フィールドにウォーターマークを指定する必要があります。ウォーターマークは、状態マネージャーが重複排除のレコードを保持する期間を制御します。ウォーターマークしきい値を通過すると、古いレコードは状態から削除されます。

次の例では、 distinct操作にウォーターマークを適用します。

Python
streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)

この例では、最新の観測値eventTimeから 1 時間以内に到着した重複レコードがストリームから削除されます。しきい値を超えると、重複排除の状態情報は削除されます。

重要

すべての列ではなく特定の列で重複を排除する必要がある場合は、 distinctではなくdropDuplicates()またはdropDuplicatesWithinWatermark()を使用します。詳細については次のセクションを参照してください。

ウォーターマーク内に重複をドロップする

Databricks Runtime 13.3 LTS 以降では、一意の識別子を使用して、ウォーターマークしきい値内のレコードの重複を排除できます。

構造化ストリーミングは、1 回だけの処理を保証しますが、データ ソースからレコードを自動的に重複排除しません。 dropDuplicatesWithinWatermark使用すると、指定したフィールドのレコードの重複を排除できるため、一部のフィールド (イベント時間や到着時間など) が異なる場合でも、ストリームから重複を削除できます。

指定したウォーターマーク内に到着した重複レコードは、削除されることが保証されます。 この保証は一方向に対してのみ厳密であり、指定されたしきい値外に到着した重複レコードもドロップされる可能性があります。 重複するイベント間の最大タイムスタンプの差よりも長いウォーターマークの遅延しきい値を設定して、すべての重複を削除する必要があります。

次の例のように、 dropDuplicatesWithinWatermark メソッドを使用するにはウォーターマークを指定する必要があります。

Python
streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)