基于spark的GraphX如何使用

72次阅读
没有评论

共计 4254 个字符,预计需要花费 11 分钟才能阅读完成。

这篇文章主要介绍“基于 spark 的 GraphX 如何使用”,在日常操作中,相信很多人在基于 spark 的 GraphX 如何使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”基于 spark 的 GraphX 如何使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

基于 Spark 的 GraphX.pptx

1. Property Graph:用户定义的有向图,图中的每个顶点和每条边都附加一个用户定义的对象,允许在两个顶点之间并行存在多条边。每个顶点都具有一个 64 位的唯一标识(VertexID),GraphX 并不强制 VertexID 有序。每条边则由起始和终止 VertexID 标识。

Graph 具有两个参数化的类型:Vertex(VD)和 Edge(ED),分别对应附加在顶点和边上的对象。当 VD 和 ED 为基本的数据类型时,Graph 将把它们保存在数组中。

Graph 和 RDD 一样(spark 的基本数据类型,Resilient Distributed Dataset),创建之后不可再改变,分布式存储在集群上,并且具有容错能力。对图中结构和值的改变,都将需要产生一个新的 Graph 对象,新的 Graph 将与之前的 Graph 共享大部分数据结构。Graph 通过顶点分割方法,分割在不同的机器上。任何数据分片所在机器的失败都将引发该数据分片在其它机器上重新创建。

逻辑上 Graph 包含 VertexRDD 和 EdgeRDD,即:

 class Graph[VD,ED] {

val vertices: VertexRDD[VD]

val edges: EdgeRDD[ED,VD]

}

其中,VertexRDD[VD]和 EdgeRDD[ED,VD]分别是 RDD[VertexID,VD]和 RDD[Edge[ED]]经过优化(extends)后的版本,提供了图计算相关功能,并做了内部优化。

2. Graph 类的成员变量

class Graph[VD, ED] {
 //Graph 的基本信息:边数,顶点数,入度,出度,度
 val numEdges: Long
 val numVertices: Long
 val inDegrees: VertexRDD[Int]
 val outDegrees: VertexRDD[Int]
 val degrees: VertexRDD[Int]
 //Graph 的顶点 RDD,边 RDD,以及三元组 RDD
 val vertices: VertexRDD[VD]
 val edges: EdgeRDD[ED, VD]
 val triplets: RDD[EdgeTriplet[VD, ED]]
 }

class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) =  VD2): Graph[VD2, ED]
 def mapEdges[ED2](map: Edge[ED] =  ED2): Graph[VD, ED2]
 def mapTriplets[ED2](map: EdgeTriplet[VD, ED] =  ED2): Graph[VD, ED2]
上面的每个操作都将改变 Graph 中 vertex 和 edge 特性,并产生一个新的 Graph

class Graph[VD, ED] { def reverse: Graph[VD, ED]
 def subgraph(epred: EdgeTriplet[VD,ED] =  Boolean,
 vpred: (VertexId, VD) =  Boolean): Graph[VD, ED]
 def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
 def groupEdges(merge: (ED, ED) =  ED): Graph[VD,ED]
}

reverse 操作返回一个愿图中边的方向反转的新图。由于该操作没有改变顶点和边的特性,所以不需要数据的移动

subgraph 操作返回连接的点和边满足 vpred 和 epred 构成的子图

mask 操作返回两个图相交的子图,groupEdges 操作合并重复的边

5. 连接操作

class Graph[VD, ED] { def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) =  VD)
 : Graph[VD, ED]
 def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) =  VD2)
 : Graph[VD2, ED]}

joinVertices 操作,连接顶点和输入的 RDD,然后对连接得到的顶点,应用用户定义的 map 函数,若在 RDD 中没有匹配连接的顶点,则保持顶点原有的值不变

outerjoinVertices 类似于 joinVertices,只是用户定义的 map 函数应用于所有的顶点,且可以改变顶点的类型

其中 f(a)(b)的写法类似于 f(a,b),只是参数 b 的类型取决于 a

6. 邻域聚合

GraphX 中,经过深度优化的核心聚合操作是 mapReduceTriplets

class Graph[VD, ED] { def mapReduceTriplets[A]( map: EdgeTriplet[VD, ED] =  Iterator[(VertexId, A)],
 reduce: (A, A) =  A)
 : VertexRDD[A]}

mapReduceTriplets 接收一个用户定义的 map 函数,应用于 Graph 的每一个三元组,产生消息 (message) 給三元组中的任意顶点。为了便于预先聚合优化,暂时只支持給其中一个顶点发送消息。随后,用户定义的 reduce 函数结合发送给每一个顶点的消息。最终返回 VertexRDD[A],没有收到消息的顶点不包含在该结果之中

mapRedeceTriplets 还包含一个可选的参数:activeSetOpt,指定执行 map 操作的顶点集合

7. 在 spark 中,RDD 默认是不会一直保存在内存中的,为了避免重复计算,需要显式的指定:Graph.cache(),显式指定保存在内存中的 RDD 只有在系统内存不足时,才会强制采用 LRU(least recently uesd)方式调出内存。然而,对于迭代计算则应该 uncaching 迭代产生的中间数据,因此,在进行图的迭代计算时,推荐采用 Pregel API,它会自动的 unpersist 不需要的中间结果。

8. GraphX Pregel API

图天然就是一个递归的数据结构,图中顶点的特性取决于它们邻域顶点的特性,反过来又影响其邻域顶点的特性。因此,很多重要的图算法都需要迭代计算每个顶点的特性,直到收敛。GraphX 提供类似于 Pregel 的操作,其是 Google Pregel 和 GraphLab 框架抽象的结合。

class GraphOps[VD, ED] { def pregel[A]
 (initialMsg: A, // 初始消息,最大迭代次数,消息传递方向
 maxIter: Int = Int.MaxValue,
 activeDir: EdgeDirection = EdgeDirection.Out)
 (vprog: (VertexId, VD, A) =  VD,
 sendMsg: EdgeTriplet[VD, ED] =  Iterator[(VertexId, A)],
 mergeMsg: (A, A) =  A)
 : Graph[VD, ED] = { var g = mapVertices( (vid, vdata) =  vprog(vid, vdata, initialMsg) ).cache()
 var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
 var activeMessages = messages.count()
 var i = 0
 while (activeMessages   0   i   maxIterations) { val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
 g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) =  newOpt.getOrElse(old) }.cache()
 messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
 activeMessages = messages.count()
 i += 1
 }
 g
 }
}

9. 创建 Graph

GraphX 提供了根据顶点和边 RDD 或者从磁盘上创建图的方法。默认情况下,图构建器不会重新分割图的边,即边将留在它们起始分片所在机器。然而,Graph.groupEdges 要求将图重新分片,因为该操作假设相同的边处在相同的分片中。所以需先调用 Graph.partitionBy 操作。

GraphLoader.edgeListFile 操作,从磁盘加载图,解析 sourceVD destinationVD,跳过 #开始的注释行,顶点值默认为 1

10.VertexRDD 和 EdgeRDD

GraphX 提供 Graph 的 VertexRDD 和 EdgeRDD,由于 GraphX 对顶点和边的数据结构进行了优化,因此还提供一些额外的功能。Vertex[A]继承自 RDD[VertexID,A],并且约束 VertexID 只能出现一次,采用哈希表的方式存储顶点属性 A。EdgeRDD 继承自 RDD[Edge[ED]]依据策略 PartitionStrategy,将边保存在分块中。在每个分块中,边的结构和属性保存在不同的结构中。

到此,关于“基于 spark 的 GraphX 如何使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计4254字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)