Control late data threshold with multiple watermark policy in Structured Streaming

When working with multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late-arriving data. Configuring watermarks allows you to control state information and impacts latency.

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. Specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams. The following is an example query with stream-stream joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

While running the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stop receiving data due to upstream failures). In other words, the global watermark safely moves at the pace of the slowest stream and the query output is delayed accordingly.

If you want to get faster results, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min). This lets the global watermark move at the pace of the fastest stream. However, this configuration drops data from the slowest streams. Because of this, we recommends that you use this configuration judiciously.