ウォーターマークを適用してデータ処理のしきい値を制御する
このページでは、ウォーターマークの概念について説明し、一般的なステートフル ストリーミング操作でウォーターマークを使用するための推奨事項を示します。
ストリーミングクエリは、時間の経過とともに状態データを蓄積します。ウォーターマークは、メモリエラーや処理遅延の増加を防ぐために、古い状態データを自動的に削除します。
ウォーターマークとは何ですか?
処理中、構造化ストリーミングはマイクロバッチ間で状態を保持します。ストリーミングクエリは、マイクロバッチごとにすべてを再計算するのではなく、状態を利用して結果を段階的に更新します。ウォーターマークは、クエリが状態エンティティの処理を停止するしきい値を制御します。
状態エンティティの一般的な例には、次のようなものがあります。
- 時間枠での集計。
- 2 つのストリーム間の結合の一意のキー。
DataFrameでウォーターマークを宣言するには、タイムスタンプ フィールドと遅延しきい値を指定します。 新しいデータが到着すると、状態マネージャーは指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のレコードのみを処理します。
クエリは常に、しきい値内に到着したレコードのみを処理します。クエリは、しきい値外に到着したレコードも処理する可能性がありますが、これは保証されません。
次の例では、ウィンドウカウントに10分のウォーターマーク閾値を適用します。
- Python
- Scala
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
この例では、次のようになります。
event_time列は、10 分間のウォーターマークと 5 分間のタンブリング ウィンドウを定義するために使用されます。- 重複しない各5分間のウィンドウにおいて、観測された
idごとにカウントが収集されます。 - 状態情報は、ウィンドウの終了が最新の観測値
event_timeより 10 分以上経過するまで、各カウントに対して保持されます。
groupBy()およびwindow()操作では、イベント時間マーカーが保持されるように、列を名前、 "<colName>"またはcol("<colName>")で参照します。Scalaでは、 $colNameも使用できます。
ウォーターマークは処理時間とスループットにどのような影響を与えますか?
出力モードは、ウォーターマークを使用したクエリがシンクにデータを書き込むタイミングを制御します。 ウォーターマークは、メモリ内の状態情報の総量を削減するため、ステートフルストリーミングにおけるスループット制御に不可欠です。すべてのステートフル操作において、すべての出力モードがサポートされているわけではありません。「ウォーターマークとウィンドウ集計の出力モード」を参照してください。
ウォーターマークの期間の選択にはトレードオフがあります。
- ウォーターマークが短いと、クエリが保存する状態情報が少なくなり、各ウォーターマーク期間が完了した後に結果が書き込まれるため、クエリのレイテンシが低くなります。 ただし、短いウォーターマークは遅延データに対する耐性が低くなります。
- ウォーターマークが長いほど、遅延データに対する耐性が高くなります。 ただし、ウォーターマークが長いと、クエリはより多くの状態情報を保存し、ウォーターマークの継続時間が長くなった後に結果を書き込むまで待機する必要があるため、クエリのレイテンシが増加します。
ウォーターマークとウィンドウ集計の出力モード
次の表は、タイムスタンプとウォーターマークによる集計を含むクエリの処理動作を示しています。
出力モード | 挙動 |
|---|---|
追加 | このクエリは、ウォーターマークのしきい値を超えた後に、対象テーブルに行を書き込みます。すべての書き込みは、遅延しきい値に基づいて遅延されます。しきい値を超えると、古い集計状態は削除されます。 |
更新 | このクエリは、結果が計算されるにつれて対象テーブルに行を書き込み、新しいデータが到着すると行を更新および上書きすることができます。しきい値を超えると、古い集計状態は削除されます。 |
完了 | 集約状態は破棄されません。このクエリは、トリガーごとにターゲットテーブルを書き換えます。 |
ウォーターマークとストリームストリーム結合の出力モード
複数のストリーム間の結合は、追加モードのみをサポートしています。クエリは、バッチごとに一致するレコードを書き込みます。
内部結合の場合、Databricksは、各ストリーミングデータソースにウォーターマークしきい値を設定して、クエリが古いレコードの状態情報を破棄できるようにすることを推奨しています。ウォーターマークがない場合、構造化ストリーミングは各トリガーで結合の両側からすべてのキーを結合しようとしますが、これはパフォーマンスに影響を与える可能性があります。
外部結合の場合、ウォーターマークは必須です。 レコードが一致しない場合、クエリはそのキーに対してnull値を書き込みます。結合は追加モードのみをサポートしているため、一致しないレコードは遅延しきい値を超えるまで書き込まれません。
複数のウォーターマーク ポリシーで遅延データのしきい値を制御する
複数の構造化ストリーミング入力の場合、遅延データの許容しきい値を制御するために、複数のウォーターマークを設定できます。ウォーターマークを使用すると、状態情報と遅延を制御できます。
ストリーミングクエリは、複数の入力ストリームを結合または統合することができます。ステートフルな操作の場合、各入力ストリームごとに、遅延データ許容度に関する異なる閾値が必要になる可能性があります。各入力ストリームでwithWatermark("eventTime", delay)を使用してこれらのしきい値を指定します。以下は、ストリーム-ストリーム結合を含むクエリの例です。
- Python
- Scala
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
ステートフル操作でクエリを実行している間、構造化ストリーミングは各入力ストリームの最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、単一のグローバル ウォーターマークを決定します。 デフォルトでは、構造化ストリーミングは最小値をグローバルウォーターマークとして使用します。あるストリームが他のストリームより遅れた場合、最小グローバルウォーターマークによって、クエリが誤ってデータを遅延データとしてマークすることを防ぎます。例えば、上流側の障害により、いずれかのストリームがデータを受信しなくなった場合に、このような事態が発生する可能性があります。グローバルウォーターマークは、最も遅いストリームの速度に合わせて安全に移動し、必要に応じてクエリの出力を遅延させます。
遅延を減らすには、 spark.sql.streaming.multipleWatermarkPolicy maxに設定してください (デフォルトはminです)。これにより、最速のストリームのウォーターマークがグローバルウォーターマークとして使用されます。しかし、この構成では、最も速度の遅いストリームからのデータが失われます。Databricksは、この設定を適用する際には注意が必要であることを推奨します。
ウォーターマークを個別の操作に適用する
distinct操作は、ステート内のすべての一意のレコードを追跡します。ウォーターマークがないと、状態が無限に増大し、メモリの問題が発生する可能性があります。 タイムスタンプ フィールドにウォーターマークを指定して状態を制限し、しきい値を超えた後に古いレコードを削除します。
次の例では、 distinct操作にウォーターマークを適用します。
- Python
- Scala
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
この例では、ストリーミングクエリは、最後に観測されたeventTimeから 1 時間以内に到着した重複レコードを削除します。クエリは、しきい値を超えると重複排除のための状態情報を削除します。
すべての列ではなく特定の列を重複排除するには、 distinctの代わりにdropDuplicates()またはdropDuplicatesWithinWatermark()を使用します。「ウォーターマーク内の重複を削除」を参照してください。
ウォーターマーク内に重複をドロップする
Databricks Runtime 13.3 LTS以降では、一意の識別子を使用して、ウォーターマークのしきい値内のレコードの重複を排除できます。
構造化ストリーミングは、厳密に1回限りの処理を保証しますが、データソースからのレコードの重複排除は行いません。dropDuplicatesWithinWatermark使用すると、イベント時間や到着時間など、重複レコード間でフィールドが異なる場合でも、任意のフィールドの重複を削除できます。
dropDuplicatesWithinWatermarkの場合、クエリは常にウォーターマークのしきい値内に到着したレコードの重複排除を行います。クエリは、しきい値外に到着したレコードの重複排除を行う場合もありますが、これは保証されません。クエリが重複をすべて削除することを保証するには、ウォーターマークのしきい値を、重複イベント間の最大タイムスタンプ差よりも大きく設定してください。
dropDuplicatesWithinWatermarkメソッドを使用するには、ウォーターマークを指定する必要があります。
- Python
- Scala
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
使用例
以下の例は、高度なウィンドウ処理の使用例を示しています。
タンブリングウィンドウを使用して、時間ごとの売上合計を計算します。
タンブリングウィンドウは、サイズが固定されており、間隔が重ならない。各入力行は、正確に1つのウィンドウに属します。タンブリング ウィンドウを使用して、時間ごとの売上合計などの離散期間集計をコンピュートします。
- Python
- Scala
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
この例では、次のようになります。
window("timestamp", "1 hour")注文を重複しない1時間間隔にグループ化します。例えば、午前5時から6時、午前6時から7時などです。withWatermark("timestamp", "1 hour")各ウィンドウの集計値は、ウィンドウ終了タイムスタンプが最大注文タイムスタンプより1時間古くなるまで、その状態を維持します。
スライディングウィンドウを使用して、転がり骨材を計算します。
スライド式窓は、サイズが固定されており、間隔を空けて重ね合わせることができる。1つの行は複数のウィンドウに属することができます。スライディング ウィンドウを使用して、6 時間周期の売上などのローリング集計を計算します。
- Python
- Scala
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
この例では、次のようになります。
window("timestamp", "6 hours", slideDuration="1 hour")注文は6時間間隔で、1時間ずつ進みます。例えば、午前5時から午前11時、午前6時から午後12時などです。withWatermark("timestamp", "1 hour")各ウィンドウの集計値は、ウィンドウ終了タイムスタンプが最大注文タイムスタンプより1時間古くなるまで、その状態を維持します。slideDurationwindowDuration以下でなければなりません。
セッションウィンドウを使用してユーザーのアクティビティを確認します
セッションウィンドウのサイズは固定されていません。行が到着するとウィンドウが開き、新しい行が一定期間存在しない状態が続くとウィンドウが閉じます。セッションウィンドウを使用して、長時間のアイドル期間の合間に発生するアクティビティの急増を集約します。たとえば、30分以内のユーザーのページビュー数などを集計します。
- Python
- Scala
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
この例では、次のようになります。
session_window("timestamp", gapDuration="30 minutes")最初のページビューが到着した際にウィンドウが開きます。30分以内に後続のページビューが発生するたびに、有効期間が延長されます。30分以内にページビューがない場合、ウィンドウは閉じられ、次のページビューは新しいウィンドウで開始されます。withWatermark("timestamp", "1 hour")ウィンドウ終了タイムスタンプが最大ページビュータイムスタンプより1時間古くなるまで、各セッションの集計値を保持します。window()とsession_window()のtimeColumn引数はTimestampTypeまたはTimestampNTZTypeでなければなりません。- イベント時間ではなく処理時間に基づいてウィンドウを定義するには、
current_timestamp()使用します。 - ウィンドウの期間は、マイクロ秒から数日まで設定できます。1か月以上の期間はサポートされていません。
- ウィンドウ集計で
complete出力モードを使用すると、すべてのウィンドウ状態を無期限に保持できます。適切なウォーターマークを使用してappend出力モードを使用することで、状態の増加を制限し、大規模なデータセットにおけるメモリの問題を防止できます。出力モードの動作の詳細については、 「ウォーターマークとウィンドウ集計の出力モード」を参照してください。