tonglin0325的个人主页

GraphX学习笔记——Programming Guide

学习的资料是官网的Programming Guide

1
2
https://spark.apache.org/docs/latest/graphx-programming-guide.html

 首先是GraphX的简介

GraphX是Spark中专门负责图和图并行计算的组件。

GraphX通过引入了图形概念来继承了Spark RDD:一个连接节点和边的有向图

为了支持图计算,GraphX引入了一些算子: subgraphjoinVertices, and aggregateMessages

和 Pregel API,此外还有一些algorithmsbuilders 来简化图分析任务。

 

关于构建 节点Vertex边Edge

1.如果需要将节点定义成一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package graphx

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}

/**
* Created by common on 18-1-22.
*/

// 抽象节点
class VertexProperty()
// User节点
case class UserProperty(val name: String) extends VertexProperty
// Product节点
case class ProductProperty(val name: String, val price: Double) extends VertexProperty

object GraphxLearning {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("GraphX").setMaster("local")
val sc = new SparkContext(conf)

// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

}
}

和节点一样,边也可以定义成一个class,同时Graph类需要和定义的节点和边的类型相对应

1
2
3
4
5
class Graph[VD, ED] {    // VD表示节点类型,ED表示边类型
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}

 

2.如果节点的类型比较简单,例如只是一个String或者(String,String),就不需要定义成一个类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package graphx

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}

/**
* Created by common on 18-1-22.
*/
object GraphxLearning {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("GraphX").setMaster("local")
val sc = new SparkContext(conf)

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
//Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

// 使用多个RDDs建立一个Graph,Graph的类型分别是节点加上边的类型,有两种节点,一种有ID,一种没有
val srcGraph: Graph[(String, String), String] = Graph(users, relationships, defaultUser)

}
}

 的一些算子
|

图信息 

numEdges: Long计算整个图中边的数目

计算整个图中边的数目

numVertices: Long计算整个图中顶点的数目

计算整个图中顶点的数目

inDegrees: VertexRDD[Int]计算所有点的入度,若顶点无入度,则不会出现在结果中

计算所有点的入度,若顶点无入度,则不会出现在结果中

outDegrees: VertexRDD[Int]计算所有点的出度,和inDegrees相似,若顶点无出度则不会出现在结果中

计算所有点的出度,和inDegrees相似,若顶点无出度则不会出现在结果中

degrees: VertexRDD[Int]计算所有顶点的出入度之和,孤立的顶点(无边与之相连)不会出现在结果中

计算所有顶点的出入度之和,孤立的顶点(无边与之相连)不会出现在结果中

查看图中的集合  ` vertices: VertexRDD[VD]` 节点`VertexRDD` ` edges: EdgeRDD[ED] ` 边EdgeRDD `triplets: RDD[EdgeTriplet[VD, ED]]`三元组RDD 图存储  `persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]`  `cache(): Graph[VD, ED]`  `unpersistVertices(blocking: Boolean = true): Graph[VD, ED]`  `操作partition的算子`  `partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]`  操作Vertex和Edge的算子,以生成新的Graph  `mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]`  ` mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]`  `mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]`  `mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]`  `mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) : Graph[VD, ED2]`  修改图结构的算子   `reverse: Graph[VD, ED]`  改变有向边的方向

 

`subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED]`子图 `mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]`  `groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]` graphx中两个节点之间可以存在多条边,可以用于将这多条边合并 Join算子  `joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]`使用顶点的更新数据生成新的顶点数据。将图数据与输入数据做内连接操作,过滤输入数据中不存在的顶点,并对连接结果使用指定的UDF进行计算,若输入数据中未包含图中某些顶点的更新数据,则在新图中使用顶点的旧数据 `outerJoinVertices[U, VD2](other: RDD[(VertexId, U)]) (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED]`  聚合算子  collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]收集每个顶点的相邻顶点的ID数据,edgeDirection用来控制收集的方向

收集每个顶点的相邻顶点的ID数据,edgeDirection用来控制收集的方向

collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]收集每个顶点的相邻顶点的数据,当图中顶点的出入度较大时,可能会占用很大的存储空间,参数edgeDirection用于控制收集方向

收集每个顶点的相邻顶点的数据,当图中顶点的出入度较大时,可能会占用很大的存储空间,参数edgeDirection用于控制收集方向

`aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A]`  `迭代图并行计算的算子`  `pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)], mergeMsg: (A, A) => A) : Graph[VD, ED]`  `基础图算法`  `pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]` 

 

`connectedComponents(): Graph[VertexId, ED]` 联通,无向联通的节点将会有一个相同的VertexId `triangleCount(): Graph[Int, ED]`  `stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] }` 强联通,有向联通的节点将会有一个相同的VertexId LabelPropagation 标签传播算法算法终止条件:它要求所有的node都满足,node的label一定是它的邻居label中出现次数最多的(或最多的之一),这意味着,每个node的邻居中,和它处于同一个community的数量一定大于等于处于其它community的数量

 标签传播算法

ShortestPaths 最短路径算法 SVDPlusPlus SVD算法 | |  | |