bikeStations.printSchema()
tripData.printSchema()
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- lat: string (nullable = true)
|-- long: string (nullable = true)
|-- dock_count: string (nullable = true)
|-- city: string (nullable = true)
|-- installation_date: string (nullable = true)
root
|-- id: string (nullable = true)
|-- duration: string (nullable = true)
|-- start_date: string (nullable = true)
|-- start_station_name: string (nullable = true)
|-- start_station_id: string (nullable = true)
|-- end_date: string (nullable = true)
|-- end_station_name: string (nullable = true)
|-- end_station_id: string (nullable = true)
|-- bike_id: string (nullable = true)
|-- subscription_type: string (nullable = true)
|-- zip_code: string (nullable = true)
val stations = tripData
.select("start_station_id").withColumnRenamed("start_station_id", "station_id")
.union(tripData.select("end_station_id").withColumnRenamed("end_station_id", "station_id"))
.distinct()
.select(col("station_id").cast("long").alias("value"))
stations.take(1) // this is just a station_id at this point
stations: org.apache.spark.sql.DataFrame = [value: bigint]
res5: Array[org.apache.spark.sql.Row] = Array([69])
val stationVertices: RDD[(VertexId, String)] = stations
.join(justStations, stations("value") === justStations("station_id"))
.select(col("station_id").cast("long"), col("name"))
.rdd
.map(row => (row.getLong(0), row.getString(1))) // maintain type information
stationVertices.take(1)
stationVertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String)] = MapPartitionsRDD[29] at map at command-9873844:5
res7: Array[(org.apache.spark.graphx.VertexId, String)] = Array((51,Embarcadero at Folsom))
val stationEdges:RDD[Edge[Long]] = tripData
.select(col("start_station_id").cast("long"), col("end_station_id").cast("long"))
.rdd
.map(row => Edge(row.getLong(0), row.getLong(1), 1))
stationEdges.take(1)
stationEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Long]] = MapPartitionsRDD[34] at map at command-9873846:4
res8: Array[org.apache.spark.graphx.Edge[Long]] = Array(Edge(66,66,1))
val defaultStation = ("Missing Station")
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)
stationGraph.cache()
defaultStation: String = Missing Station
stationGraph: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@48b8bb65
res9: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@48b8bb65
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)
// sanity check
println("Total Number of Trips in Original Data: " + tripData.count)
Total Number of Stations: 70
Total Number of Trips: 669959
Total Number of Trips in Original Data: 669959
val ranks = stationGraph.pageRank(0.0001).vertices
ranks
.join(stationVertices)
.sortBy(_._2._1, ascending=false) // sort by the rank
.take(10) // get the top 10
.foreach(x => println(x._2._2))
San Jose Diridon Caltrain Station
San Francisco Caltrain (Townsend at 4th)
Mountain View Caltrain Station
Redwood City Caltrain Station
San Francisco Caltrain 2 (330 Townsend)
Harry Bridges Plaza (Ferry Building)
Embarcadero at Sansome
2nd at Townsend
Market at Sansome
Townsend at 7th
ranks: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[895] at RDD at VertexRDD.scala:57
stationGraph
.groupEdges((edge1, edge2) => edge1 + edge2)
.triplets
.sortBy(_.attr, ascending=false)
.map(triplet =>
"There were " + triplet.attr.toString + " trips from " + triplet.srcAttr + " to " + triplet.dstAttr + ".")
.take(10)
.foreach(println)
There were 1849 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 1809 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend).
There were 1743 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome.
There were 1728 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome.
There were 1688 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th).
There were 1653 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome.
There were 1551 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
There were 1475 trips from 2nd at Townsend to Harry Bridges Plaza (Ferry Building).
There were 1439 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th).
There were 1424 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th.
stationGraph
.inDegrees // computes in Degrees
.join(stationVertices)
.sortBy(_._2._1, ascending=false)
.take(10)
.foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))
San Francisco Caltrain (Townsend at 4th) has 63179 in degrees.
San Francisco Caltrain 2 (330 Townsend) has 35117 in degrees.
Harry Bridges Plaza (Ferry Building) has 33193 in degrees.
Embarcadero at Sansome has 30796 in degrees.
2nd at Townsend has 28529 in degrees.
Market at Sansome has 28033 in degrees.
Townsend at 7th has 26637 in degrees.
Steuart at Market has 25025 in degrees.
Temporary Transbay Terminal (Howard at Beale) has 23080 in degrees.
Market at 4th has 19915 in degrees.
stationGraph
.outDegrees // out degrees
.join(stationVertices)
.sortBy(_._2._1, ascending=false)
.take(10)
.foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))
San Francisco Caltrain (Townsend at 4th) has 49092 out degrees.
San Francisco Caltrain 2 (330 Townsend) has 33742 out degrees.
Harry Bridges Plaza (Ferry Building) has 32934 out degrees.
Embarcadero at Sansome has 27713 out degrees.
Temporary Transbay Terminal (Howard at Beale) has 26089 out degrees.
2nd at Townsend has 25837 out degrees.
Steuart at Market has 24838 out degrees.
Market at Sansome has 24172 out degrees.
Townsend at 7th has 23724 out degrees.
Market at 10th has 20272 out degrees.
stationGraph
.inDegrees
.join(stationGraph.outDegrees) // join with out Degrees
.join(stationVertices) // join with our other stations
.map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
.sortBy(_._1, ascending=false)
.take(5)
.foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))
Redwood City Medical Center has a in/out degree ratio of 1.4533762057877813
Redwood City Public Library has a in/out degree ratio of 1.300469483568075
San Francisco Caltrain (Townsend at 4th) has a in/out degree ratio of 1.286951030717836
Washington at Kearney has a in/out degree ratio of 1.2548577376821652
MLK Library has a in/out degree ratio of 1.233038348082596
stationGraph
.inDegrees
.join(stationGraph.inDegrees) // join with out Degrees
.join(stationVertices) // join with our other stations
.map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)) // ratio of in to out
.sortBy(_._1)
.take(5)
.foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))
San Jose Diridon Caltrain Station has a in/out degree ratio of 1.0
San Jose Civic Center has a in/out degree ratio of 1.0
Santa Clara at Almaden has a in/out degree ratio of 1.0
Adobe on Almaden has a in/out degree ratio of 1.0
San Pedro Square has a in/out degree ratio of 1.0
Graph Analysis with GraphX
In this notebook we'll go over basic graph analysis using the GraphX API. The goal of this notebook is to show you how to use the GraphX API to perform graph analysis. We're going to be doing this with publicly available bike data from the Bay Area Bike Share portal. We're going to be specifically analyzing the second year of data.
Note: GraphX computation is only supported using the Scala and RDD APIs.
Graph Theory and Graph Processing
Graph processing is important aspect of analysis that applies to a lot of use cases. Fundamentally graph theory and processing are about defining relationships between different nodes and edges. Nodes or vertices are the units while edges are the relationships that are defined between those. This works great for social network analysis and running algorithms like PageRank to better understand and weigh relationships.
Some business use cases could be to look at central people in social networks [who is most popular in a group of friends], importance of papers in bibliographic networks [which papers are most referenced], and of course ranking web pages!
Graphs and Bike Trip Data
As mentioned, in this example we'll be using bay area bike share data. This data is free for use by the public on the website linked above. The way we're going to orient our analysis is by making every vertex a station and each trip will become an edge connecting two stations. This creates a directed graph.
Further Reference:
Table of Contents