メインコンテンツまでスキップ

ウォーターマーク内の重複を削除

ウォーターマーク内の重複行が削除された新しいDataFrameを返します。オプションで特定の列のみを考慮します。

構文

dropDuplicatesWithinWatermark(subset: Optional[List[str]] = None)

パラメーター

パラメーター

Type

説明

subset

列名のリスト(省略可能)

重複比較に使用する列のリスト(デフォルトは「すべての列」)。

戻り値

DataFrame重複のないDataFrame 。

注意

これはストリーミング DataFrame でのみ機能し、入力 DataFrame のウォーターマークはwithWatermarkを介して設定する必要があります。

ストリーミングDataFrameの場合、これにより、重複した行を削除するための中間状態としてトリガー全体のすべてのデータが保持されます。 状態は、「最も早いイベントと最も遅いイベントの時間差がウォーターマークの遅延閾値よりも小さい限り、イベントは重複排除される」という意味論を保証するために維持されます。ユーザーは、ウォーターマークの遅延しきい値を、重複イベント間の最大タイムスタンプ差よりも長く設定することをお勧めします。

注:ウォーターマークよりも古いデータは削除されます。

Spark Connectに対応しています。

Python
from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
"value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]

df.dropDuplicatesWithinWatermark()

df.dropDuplicatesWithinWatermark(['value'])