graph-analysis-graphx(Scala)

Loading...

Graph Analysis with GraphX

In this notebook we'll go over basic graph analysis using the GraphX API. The goal of this notebook is to show you how to use the GraphX API to perform graph analysis. We're going to be doing this with publicly available bike data from the Bay Area Bike Share portal. We're going to be specifically analyzing the second year of data.

Note: GraphX computation is only supported using the Scala and RDD APIs.

Graph Theory and Graph Processing

Graph processing is important aspect of analysis that applies to a lot of use cases. Fundamentally graph theory and processing are about defining relationships between different nodes and edges. Nodes or vertices are the units while edges are the relationships that are defined between those. This works great for social network analysis and running algorithms like PageRank to better understand and weigh relationships.

Some business use cases could be to look at central people in social networks [who is most popular in a group of friends], importance of papers in bibliographic networks [which papers are most referenced], and of course ranking web pages!

Graphs and Bike Trip Data

As mentioned, in this example we'll be using bay area bike share data. This data is free for use by the public on the website linked above. The way we're going to orient our analysis is by making every vertex a station and each trip will become an edge connecting two stations. This creates a directed graph.

Further Reference:

Table of Contents

  • Setup & Data
  • Imports
  • Building the Graph
  • PageRank
  • Trips from Station to Station
  • In Degrees and Out Degrees

Create DataFrames

val bikeStations = spark.sql("SELECT * FROM station_csv")
val tripData = spark.sql("SELECT * FROM trip_csv")
bikeStations: org.apache.spark.sql.DataFrame = [id: string, name: string ... 5 more fields] tripData: org.apache.spark.sql.DataFrame = [id: string, duration: string ... 9 more fields]
display(bikeStations)
 
id
name
lat
long
dock_count
city
installation_date
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
San Jose Diridon Caltrain Station
37.329732
-121.90178200000001
27
San Jose
8/6/2013
3
San Jose Civic Center
37.330698
-121.888979
15
San Jose
8/5/2013
4
Santa Clara at Almaden
37.333988
-121.894902
11
San Jose
8/6/2013
5
Adobe on Almaden
37.331415
-121.8932
19
San Jose
8/5/2013
6
San Pedro Square
37.336721000000004
-121.894074
15
San Jose
8/7/2013
7
Paseo de San Antonio
37.333798
-121.88694299999999
15
San Jose
8/7/2013
8
San Salvador at 1st
37.330165
-121.88583100000001
15
San Jose
8/5/2013
9
Japantown
37.348742
-121.89471499999999
15
San Jose
8/5/2013
10
San Jose City Hall
37.337391
-121.886995
15
San Jose
8/6/2013
11
MLK Library
37.335885
-121.88566000000002
19
San Jose
8/6/2013
12
SJSU 4th at San Carlos
37.332808
-121.88389099999999
19
San Jose
8/7/2013
13
St James Park
37.339301
-121.88993700000002
15
San Jose
8/6/2013
14
Arena Green / SAP Center
37.332692
-121.900084
19
San Jose
8/5/2013
16
SJSU - San Salvador at 9th
37.333954999999996
-121.877349
15
San Jose
8/7/2013
21
Franklin at Maple
37.481758
-122.226904
15
Redwood City
8/12/2013
22
Redwood City Caltrain Station
37.486078000000006
-122.23208899999999
25
Redwood City
8/15/2013
23
San Mateo County Center
37.487615999999996
-122.229951
15
Redwood City
8/15/2013

Showing all 70 rows.

display(tripData)
 
id
duration
start_date
start_station_name
start_station_id
end_date
end_station_name
end_station_id
bike_id
subscription_type
zip_code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
4576
63
8/29/2013 14:13
South Van Ness at Market
66
8/29/2013 14:14
South Van Ness at Market
66
520
Subscriber
94127
4607
70
8/29/2013 14:42
San Jose City Hall
10
8/29/2013 14:43
San Jose City Hall
10
661
Subscriber
95138
4130
71
8/29/2013 10:16
Mountain View City Hall
27
8/29/2013 10:17
Mountain View City Hall
27
48
Subscriber
97214
4251
77
8/29/2013 11:29
San Jose City Hall
10
8/29/2013 11:30
San Jose City Hall
10
26
Subscriber
95060
4299
83
8/29/2013 12:02
South Van Ness at Market
66
8/29/2013 12:04
Market at 10th
67
319
Subscriber
94103
4927
103
8/29/2013 18:54
Golden Gate at Polk
59
8/29/2013 18:56
Golden Gate at Polk
59
527
Subscriber
94109
4500
109
8/29/2013 13:25
Santa Clara at Almaden
4
8/29/2013 13:27
Adobe on Almaden
5
679
Subscriber
95112
4563
111
8/29/2013 14:02
San Salvador at 1st
8
8/29/2013 14:04
San Salvador at 1st
8
687
Subscriber
95112
4760
113
8/29/2013 17:01
South Van Ness at Market
66
8/29/2013 17:03
South Van Ness at Market
66
553
Subscriber
94103
4258
114
8/29/2013 11:33
San Jose City Hall
10
8/29/2013 11:35
MLK Library
11
107
Subscriber
95060
4549
125
8/29/2013 13:52
Spear at Folsom
49
8/29/2013 13:55
Embarcadero at Bryant
54
368
Subscriber
94109
4498
126
8/29/2013 13:23
San Pedro Square
6
8/29/2013 13:25
Santa Clara at Almaden
4
26
Subscriber
95112
4965
129
8/29/2013 19:32
Mountain View Caltrain Station
28
8/29/2013 19:35
Mountain View Caltrain Station
28
140
Subscriber
94041
4557
130
8/29/2013 13:57
2nd at South Park
64
8/29/2013 13:59
2nd at South Park
64
371
Subscriber
94122
4386
134
8/29/2013 12:31
Clay at Battery
41
8/29/2013 12:33
Beale at Market
56
503
Subscriber
94109
4749
138
8/29/2013 16:57
Post at Kearney
47
8/29/2013 16:59
Post at Kearney
47
408
Subscriber
94117
4242
141
8/29/2013 11:25
San Jose City Hall
10
8/29/2013 11:27
San Jose City Hall
10
26
Subscriber
95060

Truncated results, showing first 1000 rows.

It can often times be helpful to look at the exact schema to ensure that you have the right types associated with the right columns. In this case we haven't done any manipulation so we won't have anything besides string.

bikeStations.printSchema()
tripData.printSchema()
root |-- id: string (nullable = true) |-- name: string (nullable = true) |-- lat: string (nullable = true) |-- long: string (nullable = true) |-- dock_count: string (nullable = true) |-- city: string (nullable = true) |-- installation_date: string (nullable = true) root |-- id: string (nullable = true) |-- duration: string (nullable = true) |-- start_date: string (nullable = true) |-- start_station_name: string (nullable = true) |-- start_station_id: string (nullable = true) |-- end_date: string (nullable = true) |-- end_station_name: string (nullable = true) |-- end_station_id: string (nullable = true) |-- bike_id: string (nullable = true) |-- subscription_type: string (nullable = true) |-- zip_code: string (nullable = true)

Imports

We're going to need to import several things before we can continue. Because GraphX is built on the RDD API, we're going to need to create some RDDs. We'll also import everything from GraphX for simplified access.

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.col
import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.col

Build the Graph

Now that we've imported our data, we're going to need to build our graph. To do so we're going to do two things. We are going to build the structure of the vertices (or nodes) and we're going to build the structure of the edges.

val justStations = bikeStations
  .selectExpr("float(id) as station_id", "name")
  .distinct()
justStations: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [station_id: float, name: string]
val stations = tripData
  .select("start_station_id").withColumnRenamed("start_station_id", "station_id")
  .union(tripData.select("end_station_id").withColumnRenamed("end_station_id", "station_id"))
  .distinct()
  .select(col("station_id").cast("long").alias("value"))
 
stations.take(1) // this is just a station_id at this point
stations: org.apache.spark.sql.DataFrame = [value: bigint] res5: Array[org.apache.spark.sql.Row] = Array([69])

Now we can create our set of vertices and attach a bit of metadata to each of them, which in this case is the actual name of the station.

val stationVertices: RDD[(VertexId, String)] = stations
  .join(justStations, stations("value") === justStations("station_id"))
  .select(col("station_id").cast("long"), col("name"))
  .rdd
  .map(row => (row.getLong(0), row.getString(1))) // maintain type information
 
stationVertices.take(1)
stationVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[29] at map at command-9873844:5 res7: Array[(org.apache.spark.graphx.VertexId, String)] = Array((51,Embarcadero at Folsom))

Now we can create the trip edges from all of our individual rides. We'll get the station values, then just add a dummy value of 1.

val stationEdges:RDD[Edge[Long]] = tripData
  .select(col("start_station_id").cast("long"), col("end_station_id").cast("long"))
  .rdd
  .map(row => Edge(row.getLong(0), row.getLong(1), 1))
 
stationEdges.take(1)
stationEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Long]] = MapPartitionsRDD[34] at map at command-9873846:4 res8: Array[org.apache.spark.graphx.Edge[Long]] = Array(Edge(66,66,1))

Now we can build our graph. You'll notice below that I make a default station. This is for any edges that don't actually point to one of our vertices, imagine some sort of collection error, or a station that has gone out of service. It's worth understanding and analyzing the data collection process (or historical collection process) to better understand whether or not this merits more thought when applied to your own data.

I'm also going to cache our graph for faster access.

val defaultStation = ("Missing Station") 
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)
stationGraph.cache()
defaultStation: String = Missing Station stationGraph: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@48b8bb65 res9: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@48b8bb65
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)
// sanity check
println("Total Number of Trips in Original Data: " + tripData.count)
Total Number of Stations: 70 Total Number of Trips: 669959 Total Number of Trips in Original Data: 669959

Now that we're all set up and have computed some basic statistics, let's run some algorithms!

PageRank

GraphX includes a number of built-in algorithms to leverage. PageRank is one of the more popular ones popularized by the Google Search Engine and created by Larry Page. To quote Wikipedia:

PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.

What's awesome about this concept is that it readily applies to any graph type structure be them web pages or bike stations. Let's go ahead and run PageRank on our data, we can either run it for a set number of iterations or until convergence. Passing an Integer into pageRank will run for a set number of iterations while a Double will run until convergence.

val ranks = stationGraph.pageRank(0.0001).vertices
ranks
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false) // sort by the rank
  .take(10) // get the top 10
  .foreach(x => println(x._2._2))
San Jose Diridon Caltrain Station San Francisco Caltrain (Townsend at 4th) Mountain View Caltrain Station Redwood City Caltrain Station San Francisco Caltrain 2 (330 Townsend) Harry Bridges Plaza (Ferry Building) Embarcadero at Sansome 2nd at Townsend Market at Sansome Townsend at 7th ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[895] at RDD at VertexRDD.scala:57

We can see above that the Caltrain stations seem to be significant! This makes sense as these are natural connectors and likely one of the most popular uses of these bike share programs to get you from A to B in a way that you don't need a car!

Trips From Station to Station

One question you might ask is what are the most common destinations in the dataset from location to location. We can do this by performing a grouping operator and adding the edge counts together. This will yield a new graph except each edge will now be the sum of all of the semantically same edges. Think about it this way: we have a number of trips that are the exact same from station A to station B, we just want to count those up!

In the below query you'll see that we're going to grab the station to station trips that are most common and print out the top 10.

stationGraph
  .groupEdges((edge1, edge2) => edge1 + edge2)
  .triplets
  .sortBy(_.attr, ascending=false)
  .map(triplet => 
    "There were " + triplet.attr.toString + " trips from " + triplet.srcAttr + " to " + triplet.dstAttr + ".")
  .take(10)
  .foreach(println)
There were 1849 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th. There were 1809 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend). There were 1743 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome. There were 1728 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome. There were 1688 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th). There were 1653 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome. There were 1551 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th. There were 1475 trips from 2nd at Townsend to Harry Bridges Plaza (Ferry Building). There were 1439 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th). There were 1424 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.

In Degrees and Out Degrees

Remember that in this instance we've got a directed graph. That means that our trips our directional - from one location to another. Therefore we get access to a wealth of analysis that we can use. We can find the number of trips that go into a specific station and leave from a specific station.

Naturally we can sort this information and find the stations with lots of inbound and outbound trips! Check out this definition of Vertex Degrees for more information.

Now that we've defined that process, let's go ahead and find the stations that have lots of inbound and outbound traffic.

stationGraph
  .inDegrees // computes in Degrees
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))
San Francisco Caltrain (Townsend at 4th) has 63179 in degrees. San Francisco Caltrain 2 (330 Townsend) has 35117 in degrees. Harry Bridges Plaza (Ferry Building) has 33193 in degrees. Embarcadero at Sansome has 30796 in degrees. 2nd at Townsend has 28529 in degrees. Market at Sansome has 28033 in degrees. Townsend at 7th has 26637 in degrees. Steuart at Market has 25025 in degrees. Temporary Transbay Terminal (Howard at Beale) has 23080 in degrees. Market at 4th has 19915 in degrees.
stationGraph
  .outDegrees // out degrees
  .join(stationVertices)
  .sortBy(_._2._1, ascending=false)
  .take(10)
  .foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))
San Francisco Caltrain (Townsend at 4th) has 49092 out degrees. San Francisco Caltrain 2 (330 Townsend) has 33742 out degrees. Harry Bridges Plaza (Ferry Building) has 32934 out degrees. Embarcadero at Sansome has 27713 out degrees. Temporary Transbay Terminal (Howard at Beale) has 26089 out degrees. 2nd at Townsend has 25837 out degrees. Steuart at Market has 24838 out degrees. Market at Sansome has 24172 out degrees. Townsend at 7th has 23724 out degrees. Market at 10th has 20272 out degrees.

One interesting follow up question we could ask is what is the station with the highest ratio of in degrees but fewest out degrees. As in, what station acts as almost a pure trip sink. A station where trips end at but rarely start from.

stationGraph
  .inDegrees
  .join(stationGraph.outDegrees) // join with out Degrees
  .join(stationVertices) // join with our other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1, ascending=false)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))
Redwood City Medical Center has a in/out degree ratio of 1.4533762057877813 Redwood City Public Library has a in/out degree ratio of 1.300469483568075 San Francisco Caltrain (Townsend at 4th) has a in/out degree ratio of 1.286951030717836 Washington at Kearney has a in/out degree ratio of 1.2548577376821652 MLK Library has a in/out degree ratio of 1.233038348082596

We can do something similar by getting the stations with the lowest in degrees to out degrees ratios, meaning that trips start from that station but don't end there as often. This is essentially the opposite of what we have above.

stationGraph
  .inDegrees
  .join(stationGraph.inDegrees) // join with out Degrees
  .join(stationVertices) // join with our other stations
  .map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
  .sortBy(_._1)
  .take(5)
  .foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))
San Jose Diridon Caltrain Station has a in/out degree ratio of 1.0 San Jose Civic Center has a in/out degree ratio of 1.0 Santa Clara at Almaden has a in/out degree ratio of 1.0 Adobe on Almaden has a in/out degree ratio of 1.0 San Pedro Square has a in/out degree ratio of 1.0

The conclusions of what we get from the above analysis should be relatively straightforward. If we have a higher value, that means many more trips come into that station than out, and a lower value means that many more trips leave from that station than come into it!

Hopefully you've gotten some value out of this notebook! Graph stuctures are everywhere once you start looking for them and hopefully GraphX will help you analyze them better!