ウォーターマークを使用してDelta Live Tablesのステートフル処理を最適化する

状態に保持されたデータを効果的に管理するには、集計、結合、重複排除などのDelta Live Tablesでステートフル ストリーム処理を実行するときにウォーターマークを使用します。 この記事では、Delta Live Tables クエリでウォーターマークを使用する方法と、推奨される操作の例について説明します。

注:

集計を実行するクエリが増分的に処理され、更新のたびに完全に再計算されないようにするには、ウォーターマークを使用する必要があります。

ウォーターマークとは何ですか?

ストリーム処理におけるウォーターマークは、集計などのステートフル操作を実行するときにデータを処理するための時間ベースのしきい値を定義できる Apache Spark の機能です。 到着したデータは、しきい値に達するまで処理され、しきい値に達した時点で、しきい値によって定義された時間枠が閉じられます。 ウォーターマークを使用すると、主に大規模なデータセットや長時間実行される処理を処理する場合に、クエリ処理中の問題を回避できます。 これらの問題には、結果を生成する際の長い待機時間や、処理中に状態に保持されるデータの量によるメモリ不足 (OOM) エラーが含まれる場合があります。 ストリーミング データは本質的に順序付けされていないため、ウォーターマークはタイム ウィンドウ集計などの正確な計算操作もサポートします。

ストリーム処理でのウォーターマークの使用の詳細については、 「Apache Spark 構造化ストリーミングでのウォーターマーク」と「データ処理のしきい値を制御するためのウォーターマークの適用」を参照してください。

ウォーターマークをどのように定義しますか?

ウォーターマークを定義するには、タイムスタンプ フィールドと 、遅延データが 到着する時間のしきい値を表す値を指定します。 定義された時間しきい値を過ぎて到着したデータは、遅延していると見なされます。 例えば、しきい値が 10 分と定義されている場合、10 分のしきい値より後に到着するレコードはドロップされる可能性があります。

定義されたしきい値を超えて到着したレコードはドロップされる可能性があるため、レイテンシーと正確性の要件を満たすしきい値を選択することが重要です。 しきい値を小さくすると、レコードの出力が早くなりますが、遅延したレコードがドロップされる可能性が高くなります。 しきい値が大きいほど、待機時間は長くなりますが、データの完全性が高まる可能性があります。 状態サイズが大きくなるため、しきい値を大きくすると追加のコンピューティング リソースも必要になる場合があります。 しきい値はデータと処理の要件によって異なるため、最適なしきい値を決定するには処理のテストとモニタリングが重要です。

ウォーターマークを定義するには、Python のwithWatermark()関数を使用します。 SQL では、 WATERMARK句を使用してウォーターマークを定義します。

withWatermark("timestamp", "3 minutes")
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

ストリームとストリームの結合でウォーターマークを使用する

ストリーム-ストリーム結合の場合、結合の両側にウォーターマークと時間間隔句を定義する必要があります。 各結合ソースにはデータの不完全なビューがあるため、これ以上一致が得られなくなる時期をストリーミング エンジンに通知するために時間間隔句が必要です。 時間間隔句では、ウォーターマークの定義に使用したのと同じフィールドを使用する必要があります。

各ストリームがウォーターマークに対して異なるしきい値を必要とする場合があるため、ストリームのしきい値が同じである必要はありません。 データの欠落を避けるために、ストリーミング エンジンは最も遅いストリームに基づいて 1 つのグローバル ウォーターマークを維持します。

次の例では、広告インプレッションのストリームとユーザーが広告をクリックするストリームを結合します。 この例では、インプレッションから 3 分以内にクリックする必要があります。 3 分の時間間隔が経過すると、一致しなくなった状態の行は削除されます。

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf
CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

ウォーターマーク付きのウィンドウ集計を実行する

ストリーミング データに対する一般的なステートフル操作は、ウィンドウ集計です。 ウィンドウ集計はグループ化された集計と似ていますが、定義されたウィンドウの一部である行セットに対して集計値が返される点が異なります。

ウィンドウは特定の長さとして定義でき、そのウィンドウの一部であるすべての行に対して集計操作を実行できます。 Spark ストリーミングは、次の 3 種類のウィンドウをサポートします。

  • タンブリング (固定) ウィンドウ: 固定サイズで、重なり合わない、連続した一連の時間間隔。 入力レコードは 1 つのウィンドウにのみ属します。

  • スライディング ウィンドウ: タンブリング ウィンドウと同様に、スライディング ウィンドウは固定サイズですが、ウィンドウは重なり合うことができ、レコードは複数のウィンドウにまたがることができます。

データがウィンドウの末尾にウォーターマークの長さを加えた値を超えて到着すると、ウィンドウの新しいデータは受け入れられず、集計の結果が出力され、ウィンドウの状態は削除されます。

次の例では、固定ウィンドウを使用して 5 分ごとのインプレッション数の合計を計算します。 この例では、select 句でエイリアス impressions_windowを使用し、ウィンドウ自体を GROUP BY 句の一部として定義しています。 ウィンドウは、ウォーターマークと同じタイムスタンプ列 (この例では clickTimestamp 列) に基づいている必要があります。

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

時間単位の固定ウィンドウで利益を計算する Python の同様の例:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

ストリーミングレコードの重複を排除する

構造化ストリーミングには 1 回限りの処理が保証されていますが、データ ソースからレコードが自動的に重複排除されるわけではありません。 例えば、多くのメッセージ・キューには少なくとも 1 回保証があるため、これらのメッセージ・キューの 1 つから読み取るときには、重複レコードが予期されます。 dropDuplicatesWithinWatermark()関数を使用すると、指定したフィールドのレコードの重複を排除し、一部のフィールド (イベント時間や到着時間など) が異なる場合でもストリームから重複を削除できます。 dropDuplicatesWithinWatermark() 関数を使用するには、ウォーターマークを指定する必要があります。ウォーターマークで指定された時間範囲内に到着したすべての重複データは削除されます。

順序付けされたデータは、順序付けされたデータによってウォーターマーク値が誤って先にジャンプする原因となるため、重要です。 その後、古いデータが到着すると、遅延してドロップされたと見なされます。 withEventTimeOrder オプションを使用して、ウォーターマークで指定されたタイムスタンプに基づいて初期スナップショットを順番に処理します。withEventTimeOrder オプションは、データセットを定義するコードで宣言することも、 spark.databricks.delta.withEventTimeOrder.enabledを使用してパイプライン設定で宣言することもできます。例えば:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

注:

withEventTimeOrderオプションは Python でのみサポートされます。

次の例では、データは clickTimestampの順に処理され、重複する userId 列と clickAdId 列を含む 5 秒以内に到着したレコードは削除されます。

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

ステートフル処理のためにパイプライン構成を最適化する

本番運用の問題や過度のレイテンシーを防ぐために、 Databricks では、ステートフルストリーム処理で RocksDBベースのステート管理を有効にすることをお勧めします。特に、処理で大量の中間ステートを保存する必要がある場合におすすめです。

Severless パイプラインは、状態ストアの構成を自動的に管理します。

RocksDB ベースの状態管理を有効にするには、パイプラインをデプロイする前に次の構成を設定します。

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、「 Databricks での RocksDB 状態ストアの構成」を参照してください。