GraphFrames User Guide - py

GraphFrames is a package for Apache Spark which provides DataFrame-based Graphs. It provides high-level APIs in Scala, Java, and Python. It aims to provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames. This extended functionality includes motif finding, DataFrame-based serialization, and highly expressive graph queries.

The GraphFrames package is available from Spark Packages.

This notebook demonstrates examples from the GraphFrames User Guide.

from graphframes import *

Creating GraphFrames

Users can create GraphFrames from vertex and edge DataFrames.

  • Vertex DataFrame: A vertex DataFrame should contain a special column named “id” which specifies unique IDs for each vertex in the graph.
  • Edge DataFrame: An edge DataFrame should contain two special columns: “src” (source vertex ID of edge) and “dst” (destination vertex ID of edge).

Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes.

Create the vertices first:

vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

And then some edges:

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

Let’s create a graph from these vertices and these edges:

g = GraphFrame(vertices, edges)
print g
# This example graph also comes with the GraphFrames package.
# from graphframes.examples import Graphs
# g = Graphs(sqlContext).friends()

Basic graph and DataFrame queries

GraphFrames provides simple graph queries, such as node degree.

Also, since GraphFrames represent graphs as pairs of vertex and edge DataFrames, it is easy to make powerful queries directly on the vertex and edge DataFrames. Those DataFrames are available as vertices and edges fields in the GraphFrame.

display(g.vertices)
display(g.edges)

The incoming degree of the vertices:

display(g.inDegrees)

The outgoing degree of the vertices:

display(g.outDegrees)

The degree of the vertices:

display(g.degrees)

You can run queries directly on the vertices DataFrame. For example, we can find the age of the youngest person in the graph:

youngest = g.vertices.groupBy().min("age")
display(youngest)

Likewise, you can run queries on the edges DataFrame. For example, let us count the number of ‘follow’ relationships in the graph:

numFollows = g.edges.filter("relationship = 'follow'").count()
print "The number of follow edges is", numFollows

Motif finding

Build more complex relationships involving edges and vertices using motifs. The following cell finds the pairs of vertices with edges in both directions between them. The result is a dataframe, in which the column names are motif keys.

Check out the GraphFrame User Guide for more details on the API.

# Search for pairs of vertices with edges in both directions between them.
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Since the result is a DataFrame, more complex queries can build on top of the motif. Let us find all the reciprocal relationships in which one person is older than 30:

filtered = motifs.filter("b.age > 30 or a.age > 30")
display(filtered)

Subgraphs

Build subgraphs by filtering a subset of edges and vertices. For example, the following subgraph only contains people who are friends and who are more than 30 years old.

paths = g.find("(a)-[e]->(b)")\
  .filter("e.relationship = 'follow'")\
  .filter("a.age < b.age")
# The `paths` variable contains the vertex information, which we can extract:
e2 = paths.select("e.src", "e.dst", "e.relationship")

# In Spark 1.5+, the user may simplify the previous call to:
# val e2 = paths.select("e.*")

# Construct the subgraph
g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)

Standard graph algorithms

GraphFrames comes with a number of standard graph algorithms built in: * Breadth-first search (BFS) * Connected components * Strongly connected components * Label Propagation Algorithm (LPA) * PageRank * Shortest paths * Triangle count

Breadth-first search (BFS)

Search from “Esther” for users of age < 32.

paths = g.bfs("name = 'Esther'", "age < 32")
display(paths)

A use may also limit the search by edge filters or maximum path lengths.

filteredPaths = g.bfs(
  fromExpr = "name = 'Esther'",
  toExpr = "age < 32",
  edgeFilter = "relationship != 'friend'",
  maxPathLength = 3)
display(filteredPaths)

Connected components

Compute the connected component membership of each vertex and return a graph with each vertex assigned a component ID.

result = g.connectedComponents()
display(result)

Strongly connected components

Compute the strongly connected component (SCC) of each vertex and return a graph with each vertex assigned to the SCC containing that vertex.

result = g.stronglyConnectedComponents(maxIter=10)
display(result.select("id", "component"))

Label Propagation

Run static Label Propagation Algorithm for detecting communities in networks.

Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.

LPA is a standard community detection algorithm for graphs. It is inexpensive computationally, although (1) convergence is not guaranteed and (2) one can end up with trivial solutions (all nodes identify into a single community).

result = g.labelPropagation(maxIter=5)
display(result)

PageRank

Identify important vertices in a graph based on connections.

results = g.pageRank(resetProbability=0.15, tol=0.01)
display(results.vertices)
display(results.edges)
# Run PageRank for a fixed number of iterations.
g.pageRank(resetProbability=0.15, maxIter=10)
# Run PageRank personalized for vertex "a"
g.pageRank(resetProbability=0.15, maxIter=10, sourceId="a")

Shortest paths

Computes shortest paths to the given set of landmark vertices, where landmarks specify a vertex ID.

results = g.shortestPaths(landmarks=["a", "d"])
display(results)