kinesisDataFrame = spark \ .readStream \ .format('kinesis') \ .option('streamName','<YOUR_STREAM_NAME>') \ .option('initialPosition','earliest') \ .option('region','us-west-2') \ .load()
kinesisSchema = StructType() \ .add('body', StringType()) \ .add('resource', StringType()) \ .add('requestContext',StringType()) \ .add('queryStringParameters', StringType()) \ .add('httpMethod', StringType()) \ .add('pathParameters', StringType()) \ .add('headers', StringType()) \ .add('stageVariables', StringType()) \ .add('path', StringType()) \ .add('isBase64Encoded', StringType()) eventSchema = StructType().add('eventName', StringType()) \ .add('eventTime', TimestampType()) \ .add('eventParams', StructType() \ .add('game_keyword', StringType()) \ .add('app_name', StringType()) \ .add('scoreAdjustment', IntegerType()) \ .add('platform', StringType()) \ .add('app_version', StringType()) \ .add('device_id', StringType()) \ .add('client_event_time', TimestampType()) \ .add('amount', DoubleType()) )
%scala sc.parallelize(dbutils.fs.ls("/path/to/mobile_events_stream/").map(_.path)).foreach{ dbutils.fs.rm(_, true) }
gamingEventDF = kinesisDataFrame.selectExpr("cast (data as STRING) jsonData") \ .select(from_json('jsonData',kinesisSchema).alias('requestBody'))\ .select(from_json('requestBody.body', eventSchema).alias('body'))\ .select('body.eventName', 'body.eventTime', 'body.eventParams')
eventsStream = gamingEventDF.filter(gamingEventDF.eventTime.isNotNull()).withColumn("eventDate", to_date(gamingEventDF.eventTime)) \ .writeStream \ .partitionBy('eventDate') \ .format('delta') \ .option('checkpointLocation', base_path + '/_checkpoint') \ .start(base_path)
countsDF = gamingEventDF.withWatermark("eventTime", "180 minutes").groupBy(window("eventTime", "60 minute")).count() countsQuery = countsDF.writeStream \ .format('memory') \ .queryName('incoming_events_counts') \ .start()
bookingsDF = gamingEventDF.withWatermark("eventTime", "180 minutes").filter(gamingEventDF.eventName == 'purchaseEvent').groupBy(window("eventTime", "60 minute")).sum("eventParams.amount") bookingsQuery = bookingsDF.writeStream \ .format('memory') \ .queryName('incoming_events_bookings') \ .start()
%sql select count (distinct eventParams.device_id) as DAU from mobile_events_delta_raw where to_date(eventTime) = current_date
# For the sake of example, let's stop the streams to demonstrate compaction and vacuum eventsStream.stop()
Streaming Mobile Game Events
This demo will simulate an end-to-end ETL pipeline for mobile game events
Steps
Last refresh: Never