import org.apache.spark.sql.functions._ spark.conf.set("spark.sql.shuffle.partitions", "1") val impressions = spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .select($"value".as("adId"), $"timestamp".as("impressionTime")) val clicks = spark .readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load() .where((rand() * 100).cast("integer") < 10) // 10 out of every 100 impressions result in a click .select(($"value" - 50).as("adId"), $"timestamp".as("clickTime")) // -100 so that a click with same id as impression is generated much later. .where("adId > 0")
import org.apache.spark.sql.functions._
impressions: org.apache.spark.sql.DataFrame = [adId: bigint, impressionTime: timestamp]
clicks: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [adId: bigint, clickTime: timestamp]
// Define watermarks val impressionsWithWatermark = impressions .select($"adId".as("impressionAdId"), $"impressionTime") .withWatermark("impressionTime", "10 seconds ") // max 1 minutes late val clicksWithWatermark = clicks .select($"adId".as("clickAdId"), $"clickTime") .withWatermark("clickTime", "20 seconds") // max 2 minutes late // Inner join with time range conditions display( impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 minutes """ ) ) )
display_query_10(id: 77910fae-78a2-4ddf-9a6f-db01e051d94d)
Last updated: 2567 days ago
// Inner join with time range conditions display( impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 minutes """ ), "leftOuter" ) )
display_query_11(id: 089c7254-0a3d-40d1-8df7-fe43f5f279b1)
Last updated: 2567 days ago
Stream-Stream Joins using Structured Streaming (Scala)
This notebook illustrates different ways of joining streams.
We are going to use the the canonical example of ad monetization, where we want to find out which ad impressions led to user clicks. Typically, in such scenarios, there are two streams of data from different sources - ad impressions and ad clicks. Both type of events have a common ad identifier (say,
adId
), and we want to match clicks with impressions based on theadId
. In addition, each event also has a timestamp, which we will use to specify additional conditions in the query to limit the streaming state.Last refresh: Never