Stream-stream Joins using Structured Streaming [Scala](Scala)
Loading...

Stream-Stream Joins using Structured Streaming (Scala)

This notebook illustrates different ways of joining streams.

We are going to use the the canonical example of ad monetization, where we want to find out which ad impressions led to user clicks. Typically, in such scenarios, there are two streams of data from different sources - ad impressions and ad clicks. Both type of events have a common ad identifier (say, adId), and we want to match clicks with impressions based on the adId. In addition, each event also has a timestamp, which we will use to specify additional conditions in the query to limit the streaming state.

In absence of actual data streams, we are going to generate fake data streams using our built-in "rate stream", that generates data at a given fixed rate.

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.shuffle.partitions", "1")

val impressions = spark
  .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
  .select($"value".as("adId"), $"timestamp".as("impressionTime"))
  
val clicks = spark
  .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
  .where((rand() * 100).cast("integer") < 10)       // 10 out of every 100 impressions result in a click
  .select(($"value" - 50).as("adId"), $"timestamp".as("clickTime"))   // -100 so that a click with same id as impression is generated much later.
  .where("adId > 0")


import org.apache.spark.sql.functions._ impressions: org.apache.spark.sql.DataFrame = [adId: bigint, impressionTime: timestamp] clicks: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [adId: bigint, clickTime: timestamp]

Let's see what data these two streaming DataFrames generate.

display(impressions)
display_query_7(id: 02db5a6f-df32-449d-a9dd-209bda5497ed)
Last updated: 2567 days ago
 
adId
impressionTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
0
2018-03-06T04:32:09.076+0000
1
2018-03-06T04:32:09.276+0000
2
2018-03-06T04:32:09.476+0000
3
2018-03-06T04:32:09.676+0000
4
2018-03-06T04:32:09.876+0000
5
2018-03-06T04:32:10.076+0000
6
2018-03-06T04:32:10.276+0000
7
2018-03-06T04:32:10.476+0000
8
2018-03-06T04:32:10.676+0000
9
2018-03-06T04:32:10.876+0000
10
2018-03-06T04:32:11.076+0000
11
2018-03-06T04:32:11.276+0000
12
2018-03-06T04:32:11.476+0000
13
2018-03-06T04:32:11.676+0000
14
2018-03-06T04:32:11.876+0000
15
2018-03-06T04:32:12.076+0000
16
2018-03-06T04:32:12.276+0000
17
2018-03-06T04:32:12.476+0000

Showing all 25 rows.

display(clicks)
display_query_8(id: 9c6e7118-3bf7-460a-80e2-9a5e8dc8cb4a)
Last updated: 2567 days ago
 
adId
clickTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3
2018-03-06T04:32:31.941+0000
5
2018-03-06T04:32:32.341+0000
8
2018-03-06T04:32:32.941+0000
10
2018-03-06T04:32:33.341+0000
13
2018-03-06T04:32:33.941+0000
15
2018-03-06T04:32:34.341+0000
18
2018-03-06T04:32:34.941+0000
23
2018-03-06T04:32:35.941+0000
25
2018-03-06T04:32:36.341+0000
28
2018-03-06T04:32:36.941+0000
33
2018-03-06T04:32:37.941+0000
35
2018-03-06T04:32:38.341+0000
38
2018-03-06T04:32:38.941+0000
43
2018-03-06T04:32:39.941+0000
45
2018-03-06T04:32:40.341+0000
48
2018-03-06T04:32:40.941+0000
50
2018-03-06T04:32:41.341+0000
53
2018-03-06T04:32:41.941+0000

Showing all 74 rows.

Note:

  • If you get an error saying the join is not supported, the problem may be that you are running this notebook in an older version of Spark.
  • If you are running on Community Edition, click Cancel above to stop the streams, as you do not have enough cores to run many streams simultaneously.

Inner Join

Let's join these two data streams. This is exactly the same as joining two batch DataFrames/Datasets by their common key adId.

display(impressions.join(clicks, "adId"))
display_query_9(id: 417a5d17-7746-47b1-87fb-3a43a176c4fd)
Last updated: 2567 days ago
 
adId
impressionTime
clickTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3
2018-03-06T04:33:30.031+0000
2018-03-06T04:33:41.372+0000
8
2018-03-06T04:33:31.031+0000
2018-03-06T04:33:42.372+0000
11
2018-03-06T04:33:31.631+0000
2018-03-06T04:33:42.972+0000
13
2018-03-06T04:33:32.031+0000
2018-03-06T04:33:43.372+0000
15
2018-03-06T04:33:32.431+0000
2018-03-06T04:33:43.772+0000
18
2018-03-06T04:33:33.031+0000
2018-03-06T04:33:44.372+0000
23
2018-03-06T04:33:34.031+0000
2018-03-06T04:33:45.372+0000
26
2018-03-06T04:33:34.631+0000
2018-03-06T04:33:45.972+0000
28
2018-03-06T04:33:35.031+0000
2018-03-06T04:33:46.372+0000
30
2018-03-06T04:33:35.431+0000
2018-03-06T04:33:46.772+0000
33
2018-03-06T04:33:36.031+0000
2018-03-06T04:33:47.372+0000
38
2018-03-06T04:33:37.031+0000
2018-03-06T04:33:48.372+0000
41
2018-03-06T04:33:37.631+0000
2018-03-06T04:33:48.972+0000
43
2018-03-06T04:33:38.031+0000
2018-03-06T04:33:49.372+0000
45
2018-03-06T04:33:38.431+0000
2018-03-06T04:33:49.772+0000
48
2018-03-06T04:33:39.031+0000
2018-03-06T04:33:50.372+0000
53
2018-03-06T04:33:40.031+0000
2018-03-06T04:33:51.372+0000
56
2018-03-06T04:33:40.631+0000
2018-03-06T04:33:51.972+0000

Showing all 94 rows.

Note the matched impressions and clicks (matched timestamps to be specific) that is continuously in the result table above.

In addition, if you expand the details of the query above, you will find a few timelines of query metrics - the processing rates, the micro-batch durations, and the size of the state. If you keep running this query, you will notice that the state will keep growing in an unbounded manner. This is because the query must buffer all past input as any new input can match with any input from the past.

Inner Join with Watermarking

To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words, you will have to do the following additional steps in the join.

  1. Define watermark delays on both inputs such that the engine knows how delayed the input can be.

  2. Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.

    a. Time range join conditions (e.g. ...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR),

    b. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).

Let's apply these steps to our use case.

  1. Watermark delays: Say, the impressions and the corresponding clicks can be delayed/late in event-time by at most "10 seconds" and "20 seconds", respectively. This is specified in the query as watermarks delays using withWatermark.

  2. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 minute after the corresponding impression. This is specified in the query as a join condition between impressionTime and clickTime.

// Define watermarks
val impressionsWithWatermark = impressions
  .select($"adId".as("impressionAdId"), $"impressionTime")    
  .withWatermark("impressionTime", "10 seconds ")   // max 1 minutes late

val clicksWithWatermark = clicks 
  .select($"adId".as("clickAdId"), $"clickTime")    
  .withWatermark("clickTime", "20 seconds")        // max 2 minutes late


// Inner join with time range conditions
display(
  impressionsWithWatermark.join(
    clicksWithWatermark,
    expr(""" 
      clickAdId = impressionAdId AND 
      clickTime >= impressionTime AND 
      clickTime <= impressionTime + interval 1 minutes    
      """
    )
  )
)
display_query_10(id: 77910fae-78a2-4ddf-9a6f-db01e051d94d)
Last updated: 2567 days ago
 
impressionAdId
impressionTime
clickAdId
clickTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3
2018-03-06T04:35:00.478+0000
3
2018-03-06T04:35:11.478+0000
8
2018-03-06T04:35:01.478+0000
8
2018-03-06T04:35:12.478+0000
11
2018-03-06T04:35:02.078+0000
11
2018-03-06T04:35:13.078+0000
13
2018-03-06T04:35:02.478+0000
13
2018-03-06T04:35:13.478+0000
15
2018-03-06T04:35:02.878+0000
15
2018-03-06T04:35:13.878+0000
18
2018-03-06T04:35:03.478+0000
18
2018-03-06T04:35:14.478+0000
23
2018-03-06T04:35:04.478+0000
23
2018-03-06T04:35:15.478+0000
26
2018-03-06T04:35:05.078+0000
26
2018-03-06T04:35:16.078+0000
28
2018-03-06T04:35:05.478+0000
28
2018-03-06T04:35:16.478+0000
32
2018-03-06T04:35:06.278+0000
32
2018-03-06T04:35:17.278+0000
35
2018-03-06T04:35:06.878+0000
35
2018-03-06T04:35:17.878+0000
38
2018-03-06T04:35:07.478+0000
38
2018-03-06T04:35:18.478+0000
43
2018-03-06T04:35:08.478+0000
43
2018-03-06T04:35:19.478+0000
46
2018-03-06T04:35:09.078+0000
46
2018-03-06T04:35:20.078+0000
48
2018-03-06T04:35:09.478+0000
48
2018-03-06T04:35:20.478+0000
52
2018-03-06T04:35:10.278+0000
52
2018-03-06T04:35:21.278+0000
55
2018-03-06T04:35:10.878+0000
55
2018-03-06T04:35:21.878+0000
58
2018-03-06T04:35:11.478+0000
58
2018-03-06T04:35:22.478+0000

Showing all 448 rows.

We are getting the similar results as the previous simple join query. However, if you look at the query metrics now, you will find that after about a couple of minutes of running the query, the size of the state will stabilize as the old buffered events will start getting cleared up.

Outer Joins with Watermarking

Let's extend this use case to illustrate outer joins. Not all ad impressions will lead to clicks and you may want to keep track of impressions that did not produce clicks. This can be done by applying a left outer join on the impressions and clicks. The joined output will not have the matched clicks, but also the unmatched ones (with clicks data being NULL).

While the watermark + event-time constraints is optional for inner joins, for left and right outer joins they must be specified. This is because for generating the NULL results in outer join, the engine must know when an input row is not going to match with anything in future. Hence, the watermark + event-time constraints must be specified for generating correct results.

// Inner join with time range conditions
display(
  impressionsWithWatermark.join(
    clicksWithWatermark,
    expr(""" 
      clickAdId = impressionAdId AND 
      clickTime >= impressionTime AND 
      clickTime <= impressionTime + interval 1 minutes    
      """
    ),
    "leftOuter"
  )
)
display_query_11(id: 089c7254-0a3d-40d1-8df7-fe43f5f279b1)
Last updated: 2567 days ago
 
impressionAdId
impressionTime
clickAdId
clickTime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
3
2018-03-06T04:40:20.561+0000
3
2018-03-06T04:40:31.450+0000
8
2018-03-06T04:40:21.561+0000
8
2018-03-06T04:40:32.450+0000
11
2018-03-06T04:40:22.161+0000
11
2018-03-06T04:40:33.050+0000
13
2018-03-06T04:40:22.561+0000
13
2018-03-06T04:40:33.450+0000
15
2018-03-06T04:40:22.961+0000
15
2018-03-06T04:40:33.850+0000
18
2018-03-06T04:40:23.561+0000
18
2018-03-06T04:40:34.450+0000
23
2018-03-06T04:40:24.561+0000
23
2018-03-06T04:40:35.450+0000
26
2018-03-06T04:40:25.161+0000
26
2018-03-06T04:40:36.050+0000
28
2018-03-06T04:40:25.561+0000
28
2018-03-06T04:40:36.450+0000
30
2018-03-06T04:40:25.961+0000
30
2018-03-06T04:40:36.850+0000
33
2018-03-06T04:40:26.561+0000
33
2018-03-06T04:40:37.450+0000
38
2018-03-06T04:40:27.561+0000
38
2018-03-06T04:40:38.450+0000
41
2018-03-06T04:40:28.161+0000
41
2018-03-06T04:40:39.050+0000
43
2018-03-06T04:40:28.561+0000
43
2018-03-06T04:40:39.450+0000
45
2018-03-06T04:40:28.961+0000
45
2018-03-06T04:40:39.850+0000
48
2018-03-06T04:40:29.561+0000
48
2018-03-06T04:40:40.450+0000
53
2018-03-06T04:40:30.561+0000
53
2018-03-06T04:40:41.450+0000
56
2018-03-06T04:40:31.161+0000
56
2018-03-06T04:40:42.050+0000

Showing all 209 rows.