ウォーターマークを使用して Lakeflow 宣言型パイプラインでのステートフル処理を最適化
状態に保持されるデータを効果的に管理するには、 Lakeflow 宣言型パイプラインでステートフル ストリーム処理 (集計、結合、重複排除など) を実行するときにウォーターマークを使用します。 この記事では、 Lakeflow 宣言型パイプライン クエリでウォーターマークを使用する方法と、推奨される操作の例について説明します。
集計を実行するクエリが増分的に処理され、更新ごとに完全に再計算されないようにするには、ウォーターマークを使用する必要があります。
透かしとは何ですか?
ストリーム処理において、 ウォーターマークは 、集計などのステートフル操作を実行するときにデータを処理するための時間ベースのしきい値を定義できる Apache Spark 機能です。到着したデータはしきい値に達するまで処理され、しきい値に達した時点で、しきい値によって定義された時間ウィンドウが閉じられます。ウォーターマークは、主に大規模なデータセットや長時間実行される処理を処理する場合に、クエリ処理中の問題を回避できます。これらの問題には、結果を生成する際の長い待ち時間や、処理中に状態に保持されるデータの量が原因で発生するメモリ不足 (OOM) エラーが含まれる可能性があります。ストリーミング データは本質的に順序付けられていないため、ウォーターマークは時間ウィンドウの集計などの正しい計算操作もサポートします。
ストリーム処理でのウォーターマークの使用の詳細については、 Apache Spark構造化ストリーミングでのウォーターマーク」と「データ処理のしきい値を制御するためのウォーターマークの適用」を参照してください。
ウォーターマークをどのように定義しますか?
ウォーターマークを定義するには、タイムスタンプ フィールドと、 遅延データ が到着するまでの時間しきい値を表す値を指定します。定義された時間しきい値より後に到着したデータは遅延しているとみなされます。たとえば、しきい値が 10 分に定義されている場合、10 分のしきい値後に到着したレコードは削除される可能性があります。
定義されたしきい値の後に到着したレコードは削除される可能性があるため、レイテンシーと正確性の要件を満たすしきい値を選択することが重要です。より小さいしきい値を選択すると、レコードはより早く発行されますが、遅いレコードが削除される可能性も高くなります。しきい値が大きいほど待機時間は長くなりますが、データの完全性は高まる可能性があります。状態のサイズが大きいため、しきい値を大きくすると追加のコンピューティング リソースが必要になる場合もあります。しきい値はデータと処理の要件によって異なるため、最適なしきい値を決定するには、処理をテストしてモニタリングすることが重要です。
透かしを定義するには、Python のwithWatermark()
関数を使用します。SQL では、 WATERMARK
句を使用して透かしを定義します。
- Python
- SQL
withWatermark("timestamp", "3 minutes")
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
ストリーム-ストリーム結合でウォーターマークを使用する
ストリーム-ストリーム結合の場合、結合の両側にウォーターマークと時間間隔句を定義する必要があります。 各結合ソースにはデータの不完全なビューがあるため、それ以上一致ができなくなるタイミングをストリーミング エンジンに通知するために時間間隔句が必要です。時間間隔句では、透かしを定義するために使用したのと同じフィールドを使用する必要があります。
各ストリームがウォーターマークに対して異なるしきい値を必要とする場合があるため、ストリームが同じしきい値を持つ必要はありません。 データの欠落を回避するために、ストリーミング エンジンは、最も遅いストリームに基づいて 1 つのグローバル ウォーターマークを維持します。
次の例では、広告インプレッションのストリームと広告に対するユーザーのクリックのストリームを結合します。この例では、インプレッションから 3 分以内にクリックが発生する必要があります。3 分間の時間が経過すると、一致しなくなった状態の行は削除されます。
- Python
- SQL
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
透かし付きのウィンドウ集計を実行する
ストリーミング データに対する一般的なステートフル操作は、ウィンドウ集計です。ウィンドウ集計はグループ化集計に似ていますが、定義されたウィンドウの一部である行セットの集計値が返される点が異なります。
ウィンドウを特定の長さとして定義し、そのウィンドウに含まれるすべての行に対して集計操作を実行できます。Sparkストリーミングは、次の 3 種類のウィンドウをサポートします。
- タンブリング (固定) ウィンドウ : 固定サイズで重複のない連続した時間間隔のシリーズ。入力レコードは 1 つのウィンドウにのみ属します。
- スライディング ウィンドウ : タンブリング ウィンドウと同様に、スライディング ウィンドウは固定サイズですが、ウィンドウは重なり合うことができ、レコードは複数のウィンドウにまたがる場合があります。
データがウィンドウの終了とウォーターマークの長さを超えて到着すると、そのウィンドウでは新しいデータは受け入れられず、集計の結果が発行され、ウィンドウの状態は破棄されます。
次の例では、固定ウィンドウを使用して 5 分ごとにインプレッションの合計を計算します。この例では、select 句は別名impressions_window
を使用し、ウィンドウ自体はGROUP BY
句の一部として定義されています。ウィンドウは、ウォーターマークと同じタイムスタンプ列 (この例ではclickTimestamp
列) に基づいている必要があります。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
1 時間ごとに固定されたウィンドウで利益を計算する Python での同様の例:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
ストリーミングレコードの重複排除
構造化ストリーミングには 1 回限りの処理が保証されていますが、データ ソースからレコードが自動的に重複排除されるわけではありません。 たとえば、多くのメッセージ キューには少なくとも 1 回の保証があるため、これらのメッセージ キューの 1 つから読み取るときに重複したレコードが予想されます。dropDuplicatesWithinWatermark()
関数を使用すると、指定したフィールドのレコードの重複を排除し、一部のフィールド (イベント時間や到着時間など) が異なる場合でもストリームから重複を削除できます。dropDuplicatesWithinWatermark()
関数を使用するには透かしを指定する必要があります。ウォーターマークで指定された時間範囲内に到着する重複データはすべて削除されます。
順序が乱れたデータでは透かし値が誤って先に進んでしまうため、順序付けられたデータは重要です。その後、古いデータが到着すると、遅延していると見なされ、削除されます。ウォーターマークに指定されたタイムスタンプに基づいて初期スナップショットを順番に処理するには、 withEventTimeOrder
オプションを使用します。withEventTimeOrder
オプションは、データセットを定義するコード内、またはspark.databricks.delta.withEventTimeOrder.enabled
を使用してパイプライン設定内で宣言できます。例えば:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
withEventTimeOrder
オプションは Python でのみサポートされます。
次の例では、データはclickTimestamp
順序で処理され、重複するuserId
とclickAdId
列を含む、5 秒以内に到着するレコードは削除されます。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
ステートフル処理のためのパイプライン構成を最適化する
運用上の問題や過度の待機時間を防ぐために、特に処理で大量の中間状態を保存する必要がある場合は、 Databricks 、ステートフル ストリーム処理に対してRocksDBベースの状態管理を有効にすることをお勧めします。
サーバーレス パイプラインは状態ストアの構成を自動的に管理します。
パイプラインをデプロイする前に次の構成を設定することで、RocksDB ベースの状態管理を有効にすることができます。
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB の構成推奨事項を含む RocksDB 状態ストアの詳細については、 「Databricks での RocksDB 状態ストアの構成」を参照してください。