DLTのステートフル処理をウォーターマークで最適化
状態に保持されるデータを効果的に管理するには、DLT でステートフルなストリーム処理 (集計、結合、重複排除など) を実行するときにウォーターマークを使用します。この記事では、DLT クエリでウォーターマークを使用する方法と、推奨される操作の例について説明します。
集計を実行するクエリが増分的に処理され、更新のたびに完全に再計算されないようにするには、ウォーターマークを使用する必要があります。
ウォーターマークとは何ですか?
ストリーム処理では、 ウォーターマーク は、集計などのステートフル操作を実行するときにデータを処理するための時間ベースのしきい値を定義できる Apache Spark 機能です。 到着したデータは、しきい値に達するまで処理され、しきい値に達すると、しきい値で定義された時間枠が閉じられます。 ウォーターマークは、主に大規模なデータセットの処理や実行時間の長い処理など、クエリ処理中の問題を回避するために使用できます。 これらの問題には、結果の生成に長い遅延が発生することや、処理中に状態に保持されるデータの量によるメモリ不足 (OOM) エラーなどがあります。 ストリーミング データは本質的に順序付けられていないため、ウォーターマークは時間枠の集計などの操作の正しい計算もサポートします。
ストリーム処理でのウォーターマークの使用の詳細については、「 Apache Spark 構造化ストリーミングでのウォーターマーク」 および「 データ処理のしきい値を制御するためのウォーターマークの適用」を参照してください。
透かしをどのように定義しますか?
ウォーターマークを定義するには、タイムスタンプ フィールドと、 遅延データが 到着する時間しきい値を表す値を指定します。 データは、定義された時間しきい値より後に到着した場合、遅延と見なされます。 たとえば、しきい値が 10 分と定義されている場合、10 分のしきい値より後に到着したレコードはドロップされる可能性があります。
定義されたしきい値より後に到着したレコードはドロップされる可能性があるため、レイテンシーと正確性の要件を満たすしきい値を選択することが重要です。 しきい値を小さくすると、レコードが早く出力されますが、遅延したレコードがドロップされる可能性も高くなります。 しきい値が大きいほど、待ち時間は長くなりますが、データの完全性が向上する可能性があります。 状態サイズが大きいため、しきい値を大きくすると、追加のコンピューティング リソースも必要になる場合があります。 しきい値はデータと処理の要件によって異なるため、最適なしきい値を決定するには、処理のテストとモニタリングが重要です。
Python の withWatermark()
関数を使用して、ウォーターマークを定義します。 SQL では、 WATERMARK
句を使用してウォーターマークを定義します。
- Python
- SQL
withWatermark("timestamp", "3 minutes")
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
ストリームとストリームの結合でウォーターマークを使用する
ストリーム/ストリーム結合の場合、結合の両側にウォーターマークと時間間隔句を定義する必要があります。 各ジョイン・ソースにはデータの不完全なビューがあるため、これ以上一致が得られなくなったときにストリーミング・エンジンに通知するには、time interval 句が必要です。 時間間隔句では、ウォーターマークの定義に使用したのと同じフィールドを使用する必要があります。
各ストリームでウォーターマークのしきい値が異なる場合があるため、ストリームのしきい値を同じにする必要はありません。 データの欠落を避けるために、ストリーミング エンジンは、最も遅いストリームに基づいて 1 つのグローバル ウォーターマークを保持します。
次の例では、広告インプレッションのストリームと、広告に対するユーザーのクリックのストリームを結合します。 この例では、インプレッションから 3 分以内にクリックが発生する必要があります。 3 分の時間間隔が経過すると、一致しなくなった状態の行は削除されます。
- Python
- SQL
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
ウォーターマークを使用したウィンドウ集約の実行
ストリーミング データに対する一般的なステートフル操作は、ウィンドウ集計です。 ウィンドウ集計は、グループ化集計と似ていますが、定義されたウィンドウの一部である行のセットに対して集計値が返される点が異なります。
ウィンドウは特定の長さとして定義でき、そのウィンドウの一部であるすべての行に対して集計操作を実行できます。 Sparkストリーミングでは、次の 3 種類のウィンドウがサポートされています。
- タンブリング (固定) ウィンドウ : 一連の固定サイズ、オーバーラップしない、連続した時間間隔。 入力レコードは 1 つのウィンドウにのみ属します。
- スライディングウィンドウ : タンブリングウィンドウと同様に、スライディングウィンドウは固定サイズですが、ウィンドウは重なり合うことができ、レコードは複数のウィンドウに収まる可能性があります。
ウィンドウの終わりにウォーターマークの長さを加算した値を超えてデータが到着すると、ウィンドウの新しいデータは受け入れられず、集計の結果が出力され、ウィンドウの状態は削除されます。
次の例では、固定ウィンドウを使用して 5 分ごとのインプレッション数の合計を計算します。 この例では、select 句でエイリアス impressions_window
を使用し、ウィンドウ自体を GROUP BY
句の一部として定義します。 ウィンドウは、ウォーターマークと同じタイムスタンプ列 (この例では clickTimestamp
列) に基づいている必要があります。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Pythonで時間単位の固定ウィンドウで利益を計算する同様の例:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
ストリーミングレコードの重複排除
構造化ストリーミングにはexactly-once処理の保証がありますが、データソースからレコードを自動的に重複排除するわけではありません。 たとえば、多くのメッセージ・キューには少なくとも 1 回の保証があるため、これらのメッセージ・キューの 1 つから読み取るときには、重複するレコードが予想されます。 dropDuplicatesWithinWatermark()
関数を使用すると、指定したフィールドのレコードの重複を排除し、一部のフィールド (イベント時刻や到着時刻など) が異なる場合でもストリームから重複を削除できます。dropDuplicatesWithinWatermark()
関数を使用するには、ウォーターマークを指定する必要があります。ウォーターマークで指定された時間範囲内に到着したすべての重複データは削除されます。
順序付けされたデータは、順序付けされたデータによってウォーターマーク値が誤って先にジャンプする原因となるため、重要です。 その後、古いデータが到着すると、遅延してドロップされたと見なされます。 withEventTimeOrder
オプションを使用して、ウォーターマークで指定されたタイムスタンプに基づいて初期スナップショットを順番に処理します。withEventTimeOrder
オプションは、データセットを定義するコードで宣言することも、 spark.databricks.delta.withEventTimeOrder.enabled
を使用してパイプライン設定で宣言することもできます。例えば:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
withEventTimeOrder
オプションは、Pythonでのみサポートされています。
次の例では、データは clickTimestamp
順に処理され、重複する userId
列と clickAdId
列を含むレコードが互いに 5 秒以内に到着したレコードは削除されます。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
ステートフルな処理のためのパイプライン設定の最適化
本番運用の問題や過度のレイテンシーを防ぐために、 Databricks では、ステートフルストリーム処理で RocksDBベースのステート管理を有効にすることをお勧めします。特に、処理で大量の中間ステートを保存する必要がある場合におすすめです。
RocksDB ベースの状態管理を有効にするには、パイプラインをデプロイする前に次の構成を設定します。
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、「 Databricks での RocksDB 状態ストアの構成」を参照してください。