メインコンテンツまでスキップ

GraphFrames ユーザーガイド - Scala

この記事では、 GraphFrames ユーザーガイドの例を紹介します。

Scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

GraphFrames の作成

GraphFrames は、頂点とエッジ データフレームから作成できます。

  • 頂点 データフレーム: 頂点 データフレーム には、特別な列が含まれている必要があります 名前付き id グラフの各頂点の一意の ID を指定します。
  • エッジ データフレーム: エッジ データフレーム には、次の 2 つの特別な列が含まれている必要があります。 src (エッジのソース頂点 ID) と dst (ターゲット頂点 ID の edge)です。

どちらの データフレーム も、他の任意の列を持つことができます。 これらの列は、 頂点属性とエッジ属性を表します。

頂点とエッジを作成してください

Scala
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")

これらの頂点と辺からグラフを作成しましょう:

Scala
val g = GraphFrame(v, e)
Scala
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

基本的なグラフ クエリと データフレーム クエリ

GraphFrames は、ノードの次数などの単純なグラフクエリーを提供します。

また、GraphFramesはグラフを頂点と辺 データフレームのペアとして表すため、頂点と辺の データフレームに直接強力なクエリーを簡単に作成できます。 これらの データフレーム は、GraphFrames の頂点フィールドとエッジ フィールドとして使用できます。

Scala
display(g.vertices)
Scala
display(g.edges)

頂点の入力次数:

Scala
display(g.inDegrees)

頂点の出力次数:

Scala
display(g.outDegrees)

頂点の次数:

Scala
display(g.degrees)

頂点 データフレーム に対して直接クエリを実行できます。 たとえば、 グラフ内の最年少の人の年齢を見つけることができます。

Scala
val youngest = g.vertices.groupBy().min("age")
display(youngest)

同様に、エッジ データフレーム でクエリを実行できます。 たとえば、let グラフ内の「フォロー」関係の数をカウントします。

Scala
val numFollows = g.edges.filter("relationship = 'follow'").count()

モチーフの発見

モチーフを使用して、エッジと頂点を含むより複雑な関係を構築します。 次のセルは、頂点間の両方向のエッジを持つ頂点のペアを検索します。 結果は データフレーム で、列名はモチーフ キーです。

API の詳細については、 GraphFrames ユーザー ガイド を参照してください。

Scala
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

結果はデータフレームであるため、モチーフの上により複雑なクエリを作成できます。 一人の人が30歳以上であるすべての相互関係を見つけましょう。

Scala
val filtered = motifs.filter("b.age > 30")
display(filtered)

ステートフル クエリ

ほとんどのモチーフクエリーは、上記の例のようにステートレスで表現が簡単です。 次の例は、モチーフのパスに沿って状態を運ぶ、より複雑なクエリーを示しています。 GraphFrames モチーフの検出と結果のフィルターを組み合わせて、フィルターがシーケンス操作を使用して一連の データフレーム 列を構築することで、これらのクエリーを表現します。

たとえば、4 つの頂点のチェーンを次のように識別するとします。 一連の関数によって定義されるプロパティ。 つまり、チェーン間で 4つの頂点のうち a->b->c->d、一致するチェーンのサブセットを特定します この複雑なフィルターは、次の特徴を備えています。

  • パス上の状態を初期化します。
  • 頂点 a に基づいて状態を更新します。
  • 頂点 b に基づいて状態を更新します。
  • cとdなど。
  • 最終状態が何らかの条件に一致する場合、フィルターはチェーンを受け入れます。

次のコード スニペットは、このプロセスを示しています。 3 つのエッジのうち少なくとも 2 つが "friend" であるような 4 つの頂点のチェーン 関係。 この例では、状態は現在のカウント 「友達」のエッジ。一般に、任意の データフレーム 列を指定できます。

Scala
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

サブグラフ

GraphFrames 、エッジと頂点でフィルタリングすることにより、サブグラフを構築するための APIs を提供します。 これらのフィルター 一緒に構成できます。 たとえば、次のサブグラフには、友達で 30歳以上の方

Scala
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()

複雑なトリプレットフィルター

次の例は、エッジとその "src" で動作する 3 連のフィルターに基づいてサブグラフを選択する方法を示しています と "dst" 頂点。 この例を拡張して、より複雑なモチーフを使用して 3 連符を超えるのは簡単です。

Scala
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
Scala
display(g2.vertices)
Scala
display(g2.edges)

標準グラフアルゴリズム

このセクションでは、GraphFrames に組み込まれている標準のグラフ アルゴリズムについて説明します。

幅優先検索 (BFS)

「エスター」から32<歳までのユーザーを検索してください。

Scala
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

検索では、エッジ フィルターと最大パス長も制限される場合があります。

Scala
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)

接続コンポーネント

各頂点の接続されたコンポーネントのメンバーシップをコンピュートし、 グラフにコンポーネント ID が割り当てられています。

Scala
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

強結合コンポーネント

各頂点の強結合成分(SCC)をコンピュートして戻します 各頂点が、その頂点を含む SCC に割り当てられたグラフ。

Scala
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

ラベルの伝播

静的ラベル伝播アルゴリズムを実行して、コミュニティを検出します。 ネットワーク。

ネットワーク内の各ノードは、最初は独自のコミュニティに割り当てられます。 で スーパーステップごとに、ノードはコミュニティへの所属をすべての隣人に送信します そして、その状態を着信のモードコミュニティ所属に更新します メッセージ。

LPA は、グラフの標準的なコミュニティ検出アルゴリズムです。 計算的に安価であり、 ただし、(1)収束は保証されておらず、(2)些細なソリューション(すべてのノードが単一のコミュニティに識別される)で終わる可能性があります。

Scala
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

ページランク

接続に基づいてグラフ内の重要な頂点を特定します。

Scala
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
Scala
display(results.edges)
Scala
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
Scala
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

最短パス

コンピュート 指定されたランドマーク頂点のセットへの最短パス (ランドマークは頂点 ID で指定)。

Scala
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

トライアングルカウント

コンピュート 各頂点を通過する三角形の数。

Scala
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()