mobile-event-stream-etl(Python)

Streaming Mobile Game Events

This demo will simulate an end-to-end ETL pipeline for mobile game events

Steps
  • Event Generator that sends events to REST end point (Amazon API Gateway)
  • API Gateway Triggers an AWS Lambda function that writes data to Kinesis stream
  • Ingest from a Kinesis stream in real time into a Databricks Delta table
  • Perform necessary transformations to extract real-time KPI's
  • Visualize KPI's on a dashboard

Setup Instructions
  • Start cluster with IAM role "syu_mobile_streaming_demo_role"
  • Start Mobile ETL Event Generator
  • Clear out target Databricks Delta table using parallelized reset code
Setup Kinesis Stream Reader
from pyspark.sql.types import *
from pyspark.sql.functions import *
kinesisDataFrame = spark \
.readStream \
.format('kinesis') \
.option('streamName','<YOUR_STREAM_NAME>') \
.option('initialPosition','earliest') \
.option('region','us-west-2') \
.load()
Defining Incoming Data Schemas
kinesisSchema = StructType() \
            .add('body', StringType()) \
            .add('resource', StringType()) \
            .add('requestContext',StringType()) \
            .add('queryStringParameters', StringType()) \
            .add('httpMethod', StringType()) \
            .add('pathParameters', StringType()) \
            .add('headers', StringType()) \
            .add('stageVariables', StringType()) \
            .add('path', StringType()) \
            .add('isBase64Encoded', StringType())

eventSchema = StructType().add('eventName', StringType()) \
              .add('eventTime', TimestampType()) \
              .add('eventParams', StructType() \
                   .add('game_keyword', StringType()) \
                   .add('app_name', StringType()) \
                   .add('scoreAdjustment', IntegerType()) \
                   .add('platform', StringType()) \
                   .add('app_version', StringType()) \
                   .add('device_id', StringType()) \
                   .add('client_event_time', TimestampType()) \
                   .add('amount', DoubleType())
                  )            
Code to reset environment
%sql

CREATE TABLE mobile_events_delta_raw 
USING DELTA 
location '/path/to/mobile_events_stream/';
%scala
sc.parallelize(dbutils.fs.ls("/path/to/mobile_events_stream/").map(_.path)).foreach{
  dbutils.fs.rm(_, true)
}
%sql REFRESH TABLE mobile_events_delta_raw
Read incoming stream into Databricks Delta table
gamingEventDF = kinesisDataFrame.selectExpr("cast (data as STRING) jsonData") \
.select(from_json('jsonData',kinesisSchema).alias('requestBody'))\
.select(from_json('requestBody.body', eventSchema).alias('body'))\
.select('body.eventName', 'body.eventTime', 'body.eventParams')
base_path = '/path/to/mobile_events_stream/'
eventsStream = gamingEventDF.filter(gamingEventDF.eventTime.isNotNull()).withColumn("eventDate", to_date(gamingEventDF.eventTime)) \
  .writeStream \
  .partitionBy('eventDate') \
  .format('delta') \
  .option('checkpointLocation', base_path + '/_checkpoint') \
  .start(base_path)
%sql SELECT eventName, count(*) FROM mobile_events_delta_raw GROUP BY 1 ORDER BY 2 DESC

Analytics, KPI's, Oh My!

Events in the last hour?
countsDF = gamingEventDF.withWatermark("eventTime", "180 minutes").groupBy(window("eventTime", "60 minute")).count()
countsQuery = countsDF.writeStream \
  .format('memory') \
  .queryName('incoming_events_counts') \
  .start()
display(countsDF)
Bookings in the last hour?
bookingsDF = gamingEventDF.withWatermark("eventTime", "180 minutes").filter(gamingEventDF.eventName == 'purchaseEvent').groupBy(window("eventTime", "60 minute")).sum("eventParams.amount")
bookingsQuery = bookingsDF.writeStream \
  .format('memory') \
  .queryName('incoming_events_bookings') \
  .start()
display(bookingsDF)
How about DAU (Daily Active Users)?
%sql select count (distinct eventParams.device_id) as DAU from mobile_events_delta_raw where to_date(eventTime) = current_date
OK, how about the housekeeping?

A common problem with streaming is that you end up with a lot of small files. If only there were a way to perform compaction...

# For the sake of example, let's stop the streams to demonstrate compaction and vacuum
eventsStream.stop()

%fs ls /path/to/mobile_events_stream/eventDate=2018-02-15/

...and here it is!

%sql OPTIMIZE '/path/to/mobile_events_stream/'
%fs ls /path/to/mobile_events_stream/eventDate=2018-02-15/
Great, now how about cleaning up those old files?

NOTE: Do not use a retention of 0 hours in production, as this may affect queries that are currently in flight. By default this value is 7 days. We're using 0 hours here for purposes of demonstration only.

%sql
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM '/path/to/mobile_events_stream/' RETAIN 0 HOURS;
%fs ls /path/to/mobile_events_stream/eventDate=2018-02-15/