Apply watermarks to control data processing thresholds

This article introduces the basic concepts of watermarking and provides recommendations for using watermarks in common stateful streaming operations. You must apply watermarks to stateful streaming operations to avoid infinitely expanding the amount of data kept in state, which could introduce memory issues and increase processing latencies during long-running streaming operations.

What is a watermark?

Structured Streaming uses watermarks to control the threshold for how long to continue processing updates for a given state entity. Common examples of state entities include:

  • Aggregations over a time window.

  • Unique keys in a join between two streams.

When you declare a watermark, you specify a timestamp field and a watermark threshold on a streaming DataFrame. As new data arrives, the state manager tracks the most recent timestamp in the specified field and processes all records within the lateness threshold.

The following example applies a 10 minute watermark threshold to a windowed count:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In this example:

  • The event_time column is used to define a 10 minute watermark and a 5 minute tumbling window.

  • A count is collected for each id observed for each non-overlapping 5 minute windows.

  • State information is maintained for each count until the end of window is 10 minutes older than the latest observed event_time.

Important

Watermark thresholds guarantee that records arriving within the specified threshold are processed according to the semantics of the defined query. Late-arriving records arriving outside the specified threshold might still be processed using query metrics, but this is not guaranteed.

How do watermarks impact processing time and throughput?

Watermarks interact with output modes to control when data is written to the sink. Because watermarks reduce the total amount of state information to be processed, effective use of watermarks is essential for efficient stateful streaming throughput.

Note

Not all output modes are supported for all stateful operations.

Watermarks and output mode for windowed aggregations

The following table details processing for queries with aggregation on a timestamp with a watermark defined:

Output mode

Behavior

Append

Rows are written to the target table once the watermark threshold has passed. All writes are delayed based on the lateness threshold. Old aggregation state is dropped once the threshold has passed.

Update

Rows are written to the target table as results are calculated, and can be updated and overwritten as new data arrives. Old aggregation state is dropped once the threshold has passed.

Complete

Aggregation state is not dropped. The target table is rewritten with each trigger.

Watermarks and output for stream-stream joins

Joins between multiple streams only support append mode, and matched records are written in each batch they are discovered. For inner joins, Databricks recommends setting a watermark threshold on each streaming data source. This allows state information to be discarded for old records. Without watermarks, Structured Streaming attempts to join every key from both sides of the join with each trigger.

Structured Streaming has special semantics to support outer joins. Watermarking is mandatory for outer joins, as it indicates when a key must be written with a null value after going unmatched. Note that while outer joins can be useful for recording records that are never matched during data processing, because joins only write to tables as append operations, this missing data is not recorded until after the lateness threshold has passed.

Control late data threshold with multiple watermark policy in Structured Streaming

When working with multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late-arriving data. Configuring watermarks allows you to control state information and impacts latency.

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. Specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams. The following is an example query with stream-stream joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

While running the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark safely moves at the pace of the slowest stream and the query output is delayed accordingly.

If you want to get faster results, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min). This lets the global watermark move at the pace of the fastest stream. However, this configuration drops data from the slowest streams. Because of this, Databricks recommends that you use this configuration judiciously.

Drop duplicates within watermark

In Databricks Runtime 13.1 and above, you can deduplicate records within a watermark threshold using a unique identifier.

Structured Streaming provides exactly-once processing guarantees, but does not automatically deduplicate records from data sources. You can use dropDuplicatesWithinWatermark to deduplicate records on any specified field, allowing you to remove duplicates from a stream even if some fields differ (such as event time or arrival time).

Duplicate records that arrive within the specified watermark are guaranteed to be dropped. This guarantee is strict in only one direction, and duplicate records that arrive outside of the specified threshold might also be dropped. You must set the delay threshold of watermark longer than max timestamp differences among duplicated events to remove all duplicates.

You must specify a watermark to use the dropDuplicatesWithinWatermark method, as in the following example:

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")