メインコンテンツまでスキップ

透かし入り

このDataFrameのイベント時間ウォーターマークを定義します。 ウォーターマークは、それ以前の時点では遅延データが到着しないと想定される時点を追跡するものです。

構文

withWatermark(eventTime: str, delayThreshold: str)

パラメーター

パラメーター

Type

説明

eventTime

str

行のイベント時刻を含む列の名前。

delayThreshold

str

データが遅れて到着するまでの最小待機時間。これは、処理された最新のレコードを基準とした間隔(例:「1分」または「5時間」)。

戻り値

DataFrame: ウォーターマークDataFrame 。

注意

これは構造化ストリーミング専用の機能です。

Spark 、このウォーターマークをいくつかの目的で使用します。

  • 更新を許可しない出力モードを使用している場合に、特定の時間枠集計がいつ完了し、出力可能になるかを知るため。
  • 継続的な集計処理のために保持する必要のある状態の量を最小限に抑えるため。

現在のウォーターマークは、クエリ内のすべてのパーティションにわたって見られるMAX(eventTime)からユーザーが指定したdelayThresholdを除いたものを調べることによってコンピュートされます。 パーティション間でこの値を調整するコストのため、実際に使用されるウォーターマークは、実際のイベント時刻より少なくともdelayThreshold遅れていることが保証されます。

Python
from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
"value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]