Graph Analysis with GraphFrames

In this notebook we’ll go over basic graph analysis using the GraphFrames package available on spark-packages.org. The goal of this notebook is to show you how to use GraphFrames to perform graph analysis. We’re going to be doing this with publicly available bike data from the Bay Area Bike Share portal we are going to be using the second year of data available on that site.

You can also import this as a notebook by scrolling all the way to the bottom and clicking the link to the notebook.

Graph Processing Primer

Graph processing is important aspect of analysis that applies to a lot of use cases. Fundamentally graph theory and processing are about defining relationships between different nodes and edges. Nodes or vertices are the units while edges define the relationships between nodes. This works great for social network analysis and running algorithms like PageRank to better understand and weigh relationships.

Some business use cases could be to look at central people in social networks [who is most popular in a group of friends], importance of papers in bibliographic networks [which papers are most referenced], and of course ranking web pages!

As mentioned, in this example we’ll be using bay area bike share data. This data is free for use by the public on the website linked above. The way we’re going to orient our analysis is by making every vertex a station and each trip will become an edge connecting two stations. This creates a directed graph.

Further Reference:

Setup & Data

Prior to reading in some data we’re going to need to install the GraphFrames Library. See Libraries Be sure to follow the directions for specifying the maven coordinates. In order to do that you’ll need to get this information from the Spark-Packages repository. Currently (Feb. 2016) the latest version is graphframes:graphframes:0.1.0-spark1.6 however it is worth ensuring that you are using the latest version!

Once you’ve installed the library and attached it to your cluster you’re ready to download the data!

To get the data into your workspace, you’re going to want to download the data and unzip it on your computer. After you’ve unzipped the files, upload the 201508_station_data.csv and the 201508_trip_data.csv using the Tables UI in Databricks.

Be sure not to do any changing of column types or preprocessing before upload as it’s not necessary in this example. After the files upload as a table, we can import them as DataFrames into Databricks below. I’ve named the tables sf_201508_station_data and sf_201508_trip_data but feel free to call them something else!

val bikeStations = sqlContext.sql("SELECT * FROM sf_201508_station_data")
val tripData = sqlContext.sql("SELECT * FROM sf_201508_trip_data")
display(bikeStations)
display(tripData)

It can often times be helpful to look at the exact schema to ensure that you have the right types associated with the right columns. In this case we haven’t done any manipulation so we won’t have anything besides string.

bikeStations.printSchema()
tripData.printSchema()

Imports

We’re going to need to perform imports before we can continue. We’re going to import a variety of SQL functions that are going to make working with DataFrames much easier and we’re going to import everything that we’re going to need from GraphFrames.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import org.graphframes._

Building the Graph

Now that we’ve imported our data, we’re going to need to build our graph. To do so we’re going to do two things. We are going to build the structure of the vertices (or nodes) and we’re going to build the structure of the edges. What’s awesome about GraphFrames is that this process is incredibly simple. All that we need to do is rename our name column to id in the Vertices table and the start and end stations to src and dst respectively for our edges tables. These are the require naming conventions for vertices and edges in GraphFrames as of the time of this writing (Feb. 2016).

val stationVertices = bikeStations
  .withColumnRenamed("name", "id")
  .distinct()

val tripEdges = tripData
  .withColumnRenamed("Start Station", "src")
  .withColumnRenamed("End Station", "dst")
display(stationVertices)
display(tripEdges)

Now we can build our graph.

I’m also going to cache the input DataFrames to our graph in order to

val stationGraph = GraphFrame(stationVertices, tripEdges)

tripEdges.cache()
stationVertices.cache()
println("Total Number of Stations: " + stationGraph.vertices.count)
println("Total Number of Trips in Graph: " + stationGraph.edges.count)
println("Total Number of Trips in Original Data: " + tripData.count)// sanity check

Now that we’re all set up and have computed some basic statistics, let’s run some algorithms!

PageRank

Because GraphFrames build on GraphX, there are algorithms that we can leverage right away. PageRank is one of the more popular ones popularized by the Google Search Engine and created by Larry Page. To quote Wikipedia:

PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.

What’s awesome about this concept is that it readily applies to any graph type structure be them web pages or bike stations. Let’s go ahead and run PageRank on our data, unlike the GraphX API, we set our parameters to the model a bit differently as you’ll see below. However what’s great is that we automatically get to keep all of the Vertices data at the end without having to do any extra work - we’ve gotten a speed up and usability improvement for free!

val ranks = stationGraph.pageRank.resetProbability(0.15).maxIter(10).run()

display(ranks.vertices.orderBy(desc("pagerank")))

We can see above that a given vertex being a Caltrain station seems to be significant! This makes sense as these are natural connectors and likely one of the most popular uses of these bike share programs to get you from A to B in a way that you don’t need a car!

Trips From Station to Station

One question is what are the most common destinations in the dataset from location to location. We can do this by performing a grouping operator and adding the edge counts together. This will yield a new graph except each edge will now be the sum of all of the semantically same edges. Think about it this way: we have a number of trips that are the exact same from station A to station B, we just want to count those up!

In the below query you’ll see that we’re going to grab the station to station trips that are most common and print out the top 10.

val topTrips = stationGraph
  .edges
  .groupBy("src", "dst")
  .count()
  .orderBy(desc("count"))
  .limit(10)

display(topTrips)

In Degrees and Out Degrees

Remember that in this instance we’ve got a directed graph. That means that our trips our directional - from one location to another. Therefore we get access to a wealth of analysis that we can use. We can find the number of trips that go into a specific station and leave from a specific station.

Naturally we can sort this information and find the stations with lots of inbound and outbound trips! Check out this definition of Vertex Degrees for more information.

Now that we’ve defined that process, let’s go ahead and find the stations that have lots of inbound and outbound traffic.

val inDeg = stationGraph.inDegrees
display(inDeg.orderBy(desc("inDegree")).limit(5))
val outDeg = stationGraph.outDegrees
display(outDeg.orderBy(desc("outDegree")).limit(5))

One interesting follow up question we could ask is what is the station with the highest ratio of in degrees but fewest out degrees. As in, what station acts as a pure trip sink. A station where trips end at but rarely start from.

val degreeRatio = inDeg.join(outDeg, inDeg.col("id") === outDeg.col("id"))
  .drop(outDeg.col("id"))
  .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")

degreeRatio.cache()

display(degreeRatio.orderBy(desc("degreeRatio")).limit(10))

We can do something similar by getting the stations with the lowest in degrees to out degrees ratios, meaning that trips start from that station but don’t end there as often. This is essentially the opposite of what we have above.

display(degreeRatio.orderBy(asc("degreeRatio")).limit(10))