graphx 获取一度内集合 如:家庭关系

mac2025-09-18  38

 按个人需求所总结的,有错误还请指出,谢谢啦

需求是找到一度内的家庭关系,就比如爷爷A - 爸爸B - 儿子C - 孙子D,最终结果应该是以爸爸为首的爷爸儿(ABC),以儿子为首的爸儿孙(BCD)两个家庭,做到后面,发现就是判断集合的包含关系。

最开始思路是:后面有新思路

找到每个点自己一度内所有点信息,也就是以自己为首的家庭遍历每个边,根据两个点信息判断出两点的包含关系,也就是谁是当家的,当的哪一个家,(其实这里就是判断集合的包含关系了,和边类型、边方向没多大关系,也可以使用两层for循环求出结果,我这边数据量大,是借助的graphx)根据两点判断的信息,决定是否发送空信息,也就是如果是两个家庭,则不发,如果是包含关系,则向小家庭发空信息,告诉该家庭当家的不是你

根据以上思路在处理的结果中有重复数据,原因是类似family1类的家庭,可以通过对结果排序,去重处理,但是我觉得这会增加处理的工作量,比如遍历的次数多,这类数据很特殊,所以可以提前处理掉,我把整个过程分为两步:第一步求出family1,第二步求出family2

新思路如下

找到每个点自己一度内所有点信息,也就是以自己为首的家庭对家庭成员按大小排序,分组求和,也就是重复出现的次数利用组员数量和重复次数相等找到 family1 的家庭,并且仔细观察第二步的结果,可以发现,和为1的我们肯定是保留,但是对于大于1的,我们只随机保留一份即可,因为这种数据就类似于family2图中的(7,8)(33,34,35,36),这些点有相同的大家庭根据第三步的分析处理所有点,先过滤掉 family1 的点,在将重复次数大于1的随机保留一份,为1的全保留,得到一个点信息用新的点信息更新原始图graph,我这里的原始图的点属性都为空,如果你的不是,则先处理一下利用子图过滤掉不需要遍历的边遍历边的时候,向小家庭发空,得到所有小家庭利用得到的小家庭点信息更新第4步生成的点信息,得到 family2 的家庭至此,两类家庭都得到。

前后两次对比:通过提前处理,减少了很多边的遍历

 

 

 

 代码如下:

import org.apache.spark._ import org.apache.spark.graphx.{Edge, _} import org.apache.spark.rdd.RDD /** * 获取一度内组合关系,也就是大图中划分一度集合,例如:家庭关系 * */ object TestAppGraphX_family3_local { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf() sparkConf.setMaster("local") sparkConf.setAppName("TestAppGraphX_inout") val sc = new SparkContext(sparkConf) val edge = Array( Edge(31L, 32L, "1") , Edge(32L, 33L, "1") , Edge(33L, 34L, "1") , Edge(34L, 35L, "1") , Edge(35L, 36L, "1") , Edge(36L, 32L, "1") , Edge(36L, 33L, "1") , Edge(36L, 34L, "1") , Edge(34L, 32L, "1") , Edge(35L, 32L, "1") , Edge(35L, 33L, "1") , Edge(23L, 21L, "2") , Edge(10L, 11L, "2") , Edge(10L, 13L, "2") , Edge(13L, 11L, "2") , Edge(1L, 2L, "1") , Edge(2L, 6L, "2") , Edge(2L, 5L, "3") , Edge(7L, 1L, "2") , Edge(8L, 1L, "3") , Edge(9L, 5L, "2") , Edge(5L, 6L, "2") , Edge(7L, 8L, "2") , Edge(8L, 12L, "2") , Edge(7L, 12L, "2") ) val edgeRDD: RDD[Edge[String]] = sc.parallelize(edge) val graph: Graph[String, String] = Graph.fromEdges(edgeRDD, "") /** * 统计一度内,每个点拥有的所有点信息 */ val all_vertex = graph.aggregateMessages[String](edge => { edge.sendToSrc(edge.dstId.toString) edge.sendToDst(edge.srcId.toString) }, (a, b) => a + "," + b , tripletFields = TripletFields.All) .map(v => { (v._1, v._1 + "," + v._2) }) // all_vertex.foreach(println(_)) /** * 对每个点的所有点信息排序,分组 */ val reduceRDD = all_vertex .map(v => { val array = v._2.split(",") (array.sorted.mkString(","), (1, v._1.toString)) }) .reduceByKey((a, b) => { (a._1 + b._1, a._2 + "," + b._2) }).cache() /** * 找出所有独立的,全闭合的,彼此相连的组合关系,与方向无关,例如: * 两点:1 - 2 * 三点:1 - 2, 2 - 3, 3 - 1 * 四点:1 - 2, 2 - 3, 3 - 4, 4 - 1, 1 - 3, 2 - 4 * 依次类推... */ val res1RDD = reduceRDD.filter(v => { v._1.split(",").length.equals(v._2._1) }).map("family[1] : " + _._1) /** * 找出非独立的,全闭合的,彼此相连的组合关系,这里面会含有子集组合,比如 * 两个组合分别为: (1 2 3) 和 (1 2 3 4),理论上前一个属于后一个,我们要在结果集中去除前一个 * */ val base2RDD = reduceRDD.filter(v => { // 过滤掉独立闭合的组合 !v._1.split(",").length.equals(v._2._1) }).map(v => { if (v._2._1 == 1) { // 保留唯一组合,唯一组合会同时存在子集和父集 (v._2._2.toLong, v._1) } else { // 针对重复组合,随机保留一个 (v._2._2.split(",")(0).toLong, v._1) } }) println("==========去重后各点信息============") // base2RDD.foreach(println(_)) println("==========去重后各点信息============") /** * 更新图里的点信息 */ val graphM = graph.joinVertices(base2RDD)((id, tup, U) => U) /** * 获取图里的有父集的所有子集,并且都置为空 */ val subRDD = graphM // 在aggregateMessages之前,先去除不需要遍历的边,减少遍历次数,属于优化 .subgraph(epred = e => e.dstAttr != "" && e.srcAttr != "") .aggregateMessages[String](edge => { // 边上任意一点为空则不参与, // 该判断等同于上面的subgraph(epred = e => e.dstAttr != "" && e.srcAttr != "")操作 // if (edge.srcAttr != "" && edge.dstAttr != ""){ val srcArray = edge.srcAttr.split(",") val dstArray = edge.dstAttr.split(",") if (srcArray.length == 0 || dstArray.length == 0) { } // 按照源点,终点属性个数的长度分开遍历,可以减少遍历的次数,属于优化 else if (srcArray.length >= dstArray.length) { var bol = true // 判断终点是否属于源点,如果是,则向终点发空信息,如果不是,则两边都不发信息 for (i <- dstArray) { if (!srcArray.contains(i)) { bol = false } } if (bol) { edge.sendToDst("") } } else { var bol = true // 判断源点是否属于终点,如果是,则向源点发空信息,如果不是,则两边都不发信息 for (i <- srcArray) { if (!dstArray.contains(i)) { bol = false } } if (bol) { edge.sendToSrc("") } } // } }, (a, b) => "", tripletFields = TripletFields.All) println("==========待过滤所有子集============") // subRDD.foreach(println(_)) println("==========待过滤所有子集============") val res2RDD = base2RDD.leftOuterJoin(subRDD).filter(!_._2._2.isDefined).map("family[2] : " + _._2._1) res1RDD.union(res2RDD) .foreach(println(_)) sc.stop() } }

结果 :

family[1] : 24,25,26,27 family[1] : 10,11,13 family[1] : 21,23 family[2] : 31,32,33,34,35,36 family[2] : 1,2,7,8 family[2] : 1,12,7,8 family[2] : 2,5,6,9 family[2] : 1,2,5,6

 

最新回复(0)