dropDuplicatesWithinWatermark
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns, within watermark.
Syntax
dropDuplicatesWithinWatermark(subset: Optional[List[str]] = None)
Parameters
Parameter | Type | Description |
|---|---|---|
| List of column names, optional | List of columns to use for duplicate comparison (default All columns). |
Returns
DataFrame: DataFrame without duplicates.
Notes
This only works with streaming DataFrame, and watermark for the input DataFrame must be set via withWatermark.
For a streaming DataFrame, this will keep all data across triggers as intermediate state to drop duplicated rows. The state will be kept to guarantee the semantic, "Events are deduplicated as long as the time distance of earliest and latest events are smaller than the delay threshold of watermark." Users are encouraged to set the delay threshold of watermark longer than max timestamp differences among duplicated events.
Note: too late data older than watermark will be dropped.
Supports Spark Connect.
Examples
from pyspark.sql import Row
from pyspark.sql.functions import timestamp_seconds
df = spark.readStream.format("rate").load().selectExpr(
"value % 5 AS value", "timestamp")
df.select("value", df.timestamp.alias("time")).withWatermark("time", '10 minutes')
# DataFrame[value: bigint, time: timestamp]
df.dropDuplicatesWithinWatermark()
df.dropDuplicatesWithinWatermark(['value'])