This topic covers basic graph analysis using the GraphFrames package. The goal of this topic is to show you how to use GraphFrames to perform graph analysis. You’re going to be doing this with publicly available bike data from the Bay Area Bike Share portal, specifically analyzing the second year of data.
Graph processing is important aspect of analysis that applies to a lot of use cases. Fundamentally graph theory and processing are about defining relationships between different nodes and edges. Nodes or vertices are the units while edges define the relationships between nodes. This works great for social network analysis and running algorithms like PageRank to better understand and weigh relationships.
Some business use cases could be to look at central people in social networks [who is most popular in a group of friends], importance of papers in bibliographic networks [which papers are most referenced], and of course ranking web pages.
As mentioned, in this example you’ll be using Bay Area bike share data. This data is free for use by the public on the website linked above. The way you’re going to orient your analysis is by making every vertex a station and each trip will become an edge connecting two stations. This creates a directed graph.
Prior to reading in some data you’re going to need to install the GraphFrames library as a Spark package. See Libraries. Be sure to follow the directions for specifying the Maven coordinates.
Once you’ve installed the library and attached it to your cluster you’re ready to download the data.
To get the data into your workspace, you’re going to want to download the data and unzip it on your computer. After you’ve unzipped the files, upload
201508_trip_data.csv using one of the methods in Accessing Data.
Be sure not to do any changing of column types or preprocessing before upload as it’s not necessary in this example. After the files upload as a table, you can import them as DataFrames into Databricks below. I’ve named the tables
sf_201508_trip_data but feel free to call them something else.
val bikeStations = sqlContext.sql("SELECT * FROM sf_201508_station_data") val tripData = sqlContext.sql("SELECT * FROM sf_201508_trip_data")
It can often times be helpful to look at the exact schema to ensure that
you have the right types associated with the right columns. In this case
you haven’t done any manipulation so you won’t have anything besides
You’re going to need to perform imports before you can continue. You’re going to import a variety of SQL functions that are going to make working with DataFrames much easier and you’re going to import everything that you’re going to need from GraphFrames.
import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.graphframes._
Now that you’ve imported your data, you’re going to need to build your graph. To do so you’re going to do two things. You are going to build the structure of the vertices (or nodes) and you’re going to build the structure of the edges. What’s awesome about GraphFrames is that this process is incredibly simple. All that you need to do is rename your name column to
id in the vertices table and the start and end stations to
dst respectively for the edges table. These are the require naming conventions for vertices and edges in GraphFrames as of the time of this writing (Feb. 2016).
val stationVertices = bikeStations .withColumnRenamed("name", "id") .distinct() val tripEdges = tripData .withColumnRenamed("Start Station", "src") .withColumnRenamed("End Station", "dst")
Now you can build your graph.
I’m also going to cache the input DataFrames:
val stationGraph = GraphFrame(stationVertices, tripEdges) tripEdges.cache() stationVertices.cache()
println("Total Number of Stations: " + stationGraph.vertices.count) println("Total Number of Trips in Graph: " + stationGraph.edges.count) println("Total Number of Trips in Original Data: " + tripData.count)// sanity check
Now that you’re all set up and have computed some basic statistics, let’s run some algorithms.
Because GraphFrames build on GraphX, there are algorithms that you can leverage right away. PageRank is one of the more popular ones popularized by the Google Search Engine and created by Larry Page. To quote Wikipedia:
PageRank works by counting the number and quality of links to a page to determine a rough estimate of how important the website is. The underlying assumption is that more important websites are likely to receive more links from other websites.
What’s awesome about this concept is that it readily applies to any graph type structure be them web pages or bike stations. Let’s go ahead and run PageRank on your data, unlike the GraphX API, you set your parameters to the model a bit differently as you’ll see below. However what’s great is that you automatically get to keep all of the Vertices data at the end without having to do any extra work - you’ve gotten a speed up and usability improvement for free.
val ranks = stationGraph.pageRank.resetProbability(0.15).maxIter(10).run() display(ranks.vertices.orderBy(desc("pagerank")))
You can see above that a given vertex being a Caltrain station seems to be significant. This makes sense as these are natural connectors and likely one of the most popular uses of these bike share programs to get you from A to B in a way that you don’t need a car.
One question is what are the most common destinations in the dataset from location to location. You can do this by performing a grouping operator and adding the edge counts together. This will yield a new graph except each edge will now be the sum of all of the semantically same edges. Think about it this way: you have a number of trips that are the exact same from station A to station B, you just want to count those up.
In the below query you’ll see that you’re going to grab the station to station trips that are most common and print out the top 10.
val topTrips = stationGraph .edges .groupBy("src", "dst") .count() .orderBy(desc("count")) .limit(10) display(topTrips)
Remember that in this instance you’ve got a directed graph. That means that your trips are directional - from one location to another. Therefore you get access to a wealth of analysis that you can use. You can find the number of trips that go into a specific station and leave from a specific station.
Naturally you can sort this information and find the stations with lots of inbound and outbound trips. Check out this definition of Vertex Degrees for more information.
Now that you’ve defined that process, let’s go ahead and find the stations that have lots of inbound and outbound traffic.
val inDeg = stationGraph.inDegrees display(inDeg.orderBy(desc("inDegree")).limit(5))
val outDeg = stationGraph.outDegrees display(outDeg.orderBy(desc("outDegree")).limit(5))
One interesting follow up question you could ask is what is the station with the highest ratio of in degrees but fewest out degrees. As in, what station acts as a pure trip sink. A station where trips end at but rarely start from.
val degreeRatio = inDeg.join(outDeg, inDeg.col("id") === outDeg.col("id")) .drop(outDeg.col("id")) .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio") degreeRatio.cache() display(degreeRatio.orderBy(desc("degreeRatio")).limit(10))
You can do something similar by getting the stations with the lowest in degrees to out degrees ratios, meaning that trips start from that station but don’t end there as often. This is essentially the opposite of what you have above.