ウォーターマークを適用してデータ処理のしきい値を制御する
この記事では、ウォーターマークの基本的な概念を紹介し、一般的なステートフル ストリーミング操作でウォーターマークを使用するための推奨事項を示します。 ステートフル ストリーミング操作にウォーターマークを適用すると、ステートに保持されるデータ量が無限に拡大し、長時間実行されるストリーミング操作中にメモリの問題が発生し、処理の待機時間が長くなるのを防ぐことができます。
ウォーターマークとは何ですか?
構造化ストリーミングでは、ウォーターマークを使用して、特定の状態エンティティの更新の処理を続行する時間のしきい値を制御します。 国家エンティティの一般的な例には、次のようなものがあります。
- 時間枠での集計。
- 2 つのストリーム間の結合の一意のキー。
ウォーターマークを宣言するときは、ストリーミング DataFrame のタイムスタンプ フィールドとウォーターマークのしきい値を指定します。 新しいデータが到着すると、ステートマネージャーは指定されたフィールドの最新のタイムスタンプを追跡し、遅延しきい値内のすべてのレコードを処理します。
次の例では、ウィンドウカウントに10分のウォーターマーク閾値を適用します。
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
この例では、次のようになります。
event_time
列は、10 分間のウォーターマークと 5 分間のタンブリング ウィンドウを定義するために使用されます。- カウントは、重複しない 5 分間のウィンドウごとに観測された
id
ごとに収集されます。 - 状態情報は、ウィンドウの終了時に最新の観測
event_time
より 10 分古いまで、カウントごとに保持されます。
ウォーターマークしきい値は、指定されたしきい値内に到着したレコードが、定義されたクエリのセマンティクスに従って処理されることを保証します。 指定されたしきい値を超えて到着した遅延到着レコードは、クエリメトリクスを使用して処理される可能性がありますが、これは保証されません。
ウォーターマークは処理時間とスループットにどのように影響しますか?
ウォーターマークは出力モードと対話して、データがシンクに書き込まれるタイミングを制御します。 ウォーターマークは処理する状態情報の総量を減らすため、効率的なステートフル ストリーミング スループットにはウォーターマークの効果的な使用が不可欠です。
すべての出力モードがすべてのステートフル操作でサポートされているわけではありません。
ウィンドウ集計のウォーターマークと出力モード
次の表は、ウォーターマークが定義されたタイムスタンプで集計されたクエリの処理の詳細を示しています。
出力モード | 挙動 |
---|---|
追加 | 行は、ウォーターマークのしきい値を超えると、ターゲットテーブルに書き込まれます。 すべての書き込みは、遅延しきい値に基づいて遅延されます。 古い集計状態は、しきい値を超えるとドロップされます。 |
更新 | 行は、結果が計算されるときにターゲット・テーブルに書き込まれ、新しいデータが到着すると更新および上書きできます。 古い集計状態は、しきい値を超えるとドロップされます。 |
完了 | 集計状態は削除されません。 ターゲット・テーブルは、トリガーごとに書き直されます。 |
ストリーム-ストリーム結合のウォーターマークと出力
複数のストリーム間の結合は追加モードのみをサポートし、一致したレコードは検出された各バッチに書き込まれます。 内部結合の場合、 Databricks は各ストリーミング データソースにウォーターマークのしきい値を設定することをお勧めします。 これにより、古いレコードの状態情報を破棄できます。 ウォーターマークがない場合、構造化ストリーミングは、結合の両側からすべてのキーを各トリガーで結合しようとします。
構造化ストリーミングには、外部結合をサポートするための特別なセマンティクスがあります。 ウォーターマークは、一致しなかった後にキーを null 値で書き込む必要がある場合を示すため、外部結合には必須です。 外部ジョインは、データ処理中に一致しないレコードを記録する場合に便利ですが、ジョインは追加操作としてテーブルに書き込むだけなので、この欠落データは遅延しきい値を過ぎるまで記録されないことに注意してください。
構造化ストリーミングの複数のウォーターマークポリシーによる遅延データのしきい値の制御
複数の構造化ストリーミング入力を操作する場合、複数のウォーターマークを設定して、遅延到着データの許容しきい値を制御できます。 ウォーターマークを設定すると、状態情報を制御でき、レイテンシーに影響を与えることができます。
ストリーミング クエリには、ユニオンまたは結合された複数の入力ストリームを含めることができます。 各入力ストリームには、ステートフル操作で許容する必要がある遅延データの異なるしきい値を設定できます。 これらのしきい値は、
各入力ストリームでwithWatermarks("eventTime", delay)
します。 次に、 ストリーム/ストリーム結合を使用したクエリの例を示します。
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
クエリの実行中、構造化ストリーミングは、各入力ストリームで見られる最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作に使用する単一のグローバルウォーターマークを選択します。 デフォルトでは、最小値がグローバル・ウォーターマークとして選択されるのは、ストリームの 1 つが他のストリームより遅れてドロップされた場合 (たとえば、アップストリームの障害によりストリームの 1 つがデータの受信を停止した場合) に、データが誤ってドロップされることがないようにするためです。 つまり、グローバル ウォーターマークは最も遅いストリームのペースで安全に移動し、それに応じてクエリ出力が遅延します。
より高速な結果を得るには、SQL 設定 spark.sql.streaming.multipleWatermarkPolicy
を max
に設定することで、グローバル ウォーターマークとして最大値を選択するように複数ウォーターマーク ポリシーを設定できます (デフォルトは min
です)。 これにより、グローバルウォーターマークは最速のストリームのペースで移動できます。 ただし、この設定では、最も遅いストリームからデータがドロップされます。 このため、Databricks では、この構成を慎重に使用することをお勧めします。
ウォーターマーク内に重複をドロップする
Databricks Runtime 13.3 LTS 以降では、一意の識別子を使用して、ウォーターマークのしきい値内のレコードの重複排除を行うことができます。
構造化ストリーミングは、exactly-once 処理を保証しますが、データソースからレコードを自動的に重複排除するわけではありません。 dropDuplicatesWithinWatermark
を使用して、指定した任意のフィールドのレコードの重複を排除できるため、一部のフィールド (イベント時間や到着時間など) が異なる場合でも、ストリームから重複を削除できます。
指定したウォーターマーク内に到着した重複レコードは、削除されることが保証されます。 この保証は一方向に対してのみ厳密であり、指定されたしきい値外に到着した重複レコードもドロップされる可能性があります。 重複するイベント間の最大タイムスタンプの差よりも長いウォーターマークの遅延しきい値を設定して、すべての重複を削除する必要があります。
次の例のように、 dropDuplicatesWithinWatermark
メソッドを使用するにはウォーターマークを指定する必要があります。
- Python
- Scala
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])