按个人需求所总结的,有错误还请指出,谢谢啦
需求是找到一度内的家庭关系,就比如爷爷A - 爸爸B - 儿子C - 孙子D,最终结果应该是以爸爸为首的爷爸儿(ABC),以儿子为首的爸儿孙(BCD)两个家庭,做到后面,发现就是判断集合的包含关系。
最开始思路是:后面有新思路
找到每个点自己一度内所有点信息,也就是以自己为首的家庭遍历每个边,根据两个点信息判断出两点的包含关系,也就是谁是当家的,当的哪一个家,(其实这里就是判断集合的包含关系了,和边类型、边方向没多大关系,也可以使用两层for循环求出结果,我这边数据量大,是借助的graphx)根据两点判断的信息,决定是否发送
空信息,也就是如果是两个家庭,则不发,如果是包含关系,则向小家庭发空信息,告诉该家庭当家的不是你
根据以上思路在处理的结果中有重复数据,原因是类似family1类的家庭,可以通过对结果排序,去重处理,但是我觉得这会增加处理的工作量,比如遍历的次数多,这类数据很特殊,所以可以提前处理掉,我把整个过程分为两步:第一步求出family1,第二步求出family2
新思路如下
找到每个点自己一度内所有点信息,也就是以自己为首的家庭对家庭成员按大小排序,分组求和,也就是重复出现的次数利用组员数量和重复次数相等找到
family1 的家庭,并且仔细观察第二步的结果,可以发现,和为1的我们肯定是保留,但是对于大于1的,我们只随机保留一份即可,因为这种数据就类似于family2图中的(7,8)(33,34,35,36),这些点有相同的大家庭根据第三步的分析处理所有点,先过滤掉 family1 的点,在将重复次数大于1的随机保留一份,为1的全保留,得到一个点信息用新的点信息更新原始图graph,我这里的原始图的点属性都为空,如果你的不是,则先处理一下利用子图过滤掉不需要遍历的边遍历边的时候,向小家庭发空,得到所有小家庭利用得到的小家庭点信息更新第4步生成的点信息,得到
family2 的家庭至此,两类家庭都得到。
前后两次对比:通过提前处理,减少了很多边的遍历
代码如下:
import org.apache.spark._
import org.apache.spark.graphx.{Edge, _}
import org.apache.spark.rdd.RDD
/**
* 获取一度内组合关系,也就是大图中划分一度集合,例如:家庭关系
*
*/
object TestAppGraphX_family3_local {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local")
sparkConf.setAppName("TestAppGraphX_inout")
val sc = new SparkContext(sparkConf)
val edge = Array(
Edge(31L, 32L, "1")
, Edge(32L, 33L, "1")
, Edge(33L, 34L, "1")
, Edge(34L, 35L, "1")
, Edge(35L, 36L, "1")
, Edge(36L, 32L, "1")
, Edge(36L, 33L, "1")
, Edge(36L, 34L, "1")
, Edge(34L, 32L, "1")
, Edge(35L, 32L, "1")
, Edge(35L, 33L, "1")
, Edge(23L, 21L, "2")
, Edge(10L, 11L, "2")
, Edge(10L, 13L, "2")
, Edge(13L, 11L, "2")
, Edge(1L, 2L, "1")
, Edge(2L, 6L, "2")
, Edge(2L, 5L, "3")
, Edge(7L, 1L, "2")
, Edge(8L, 1L, "3")
, Edge(9L, 5L, "2")
, Edge(5L, 6L, "2")
, Edge(7L, 8L, "2")
, Edge(8L, 12L, "2")
, Edge(7L, 12L, "2")
)
val edgeRDD: RDD[Edge[String]] = sc.parallelize(edge)
val graph: Graph[String, String] = Graph.fromEdges(edgeRDD, "")
/**
* 统计一度内,每个点拥有的所有点信息
*/
val all_vertex = graph.aggregateMessages[String](edge => {
edge.sendToSrc(edge.dstId.toString)
edge.sendToDst(edge.srcId.toString)
}, (a, b) => a + "," + b
, tripletFields = TripletFields.All)
.map(v => {
(v._1, v._1 + "," + v._2)
})
// all_vertex.foreach(println(_))
/**
* 对每个点的所有点信息排序,分组
*/
val reduceRDD = all_vertex
.map(v => {
val array = v._2.split(",")
(array.sorted.mkString(","), (1, v._1.toString))
})
.reduceByKey((a, b) => {
(a._1 + b._1, a._2 + "," + b._2)
}).cache()
/**
* 找出所有独立的,全闭合的,彼此相连的组合关系,与方向无关,例如:
* 两点:1 - 2
* 三点:1 - 2, 2 - 3, 3 - 1
* 四点:1 - 2, 2 - 3, 3 - 4, 4 - 1, 1 - 3, 2 - 4
* 依次类推...
*/
val res1RDD = reduceRDD.filter(v => {
v._1.split(",").length.equals(v._2._1)
}).map("family[1] : " + _._1)
/**
* 找出非独立的,全闭合的,彼此相连的组合关系,这里面会含有子集组合,比如
* 两个组合分别为: (1 2 3) 和 (1 2 3 4),理论上前一个属于后一个,我们要在结果集中去除前一个
*
*/
val base2RDD = reduceRDD.filter(v => {
// 过滤掉独立闭合的组合
!v._1.split(",").length.equals(v._2._1)
}).map(v => {
if (v._2._1 == 1) {
// 保留唯一组合,唯一组合会同时存在子集和父集
(v._2._2.toLong, v._1)
} else {
// 针对重复组合,随机保留一个
(v._2._2.split(",")(0).toLong, v._1)
}
})
println("==========去重后各点信息============")
// base2RDD.foreach(println(_))
println("==========去重后各点信息============")
/**
* 更新图里的点信息
*/
val graphM = graph.joinVertices(base2RDD)((id, tup, U) => U)
/**
* 获取图里的有父集的所有子集,并且都置为空
*/
val subRDD = graphM
// 在aggregateMessages之前,先去除不需要遍历的边,减少遍历次数,属于优化
.subgraph(epred = e => e.dstAttr != "" && e.srcAttr != "")
.aggregateMessages[String](edge => {
// 边上任意一点为空则不参与,
// 该判断等同于上面的subgraph(epred = e => e.dstAttr != "" && e.srcAttr != "")操作
// if (edge.srcAttr != "" && edge.dstAttr != ""){
val srcArray = edge.srcAttr.split(",")
val dstArray = edge.dstAttr.split(",")
if (srcArray.length == 0 || dstArray.length == 0) {
}
// 按照源点,终点属性个数的长度分开遍历,可以减少遍历的次数,属于优化
else if (srcArray.length >= dstArray.length) {
var bol = true
// 判断终点是否属于源点,如果是,则向终点发空信息,如果不是,则两边都不发信息
for (i <- dstArray) {
if (!srcArray.contains(i)) {
bol = false
}
}
if (bol) {
edge.sendToDst("")
}
} else {
var bol = true
// 判断源点是否属于终点,如果是,则向源点发空信息,如果不是,则两边都不发信息
for (i <- srcArray) {
if (!dstArray.contains(i)) {
bol = false
}
}
if (bol) {
edge.sendToSrc("")
}
}
// }
}, (a, b) => "", tripletFields = TripletFields.All)
println("==========待过滤所有子集============")
// subRDD.foreach(println(_))
println("==========待过滤所有子集============")
val res2RDD = base2RDD.leftOuterJoin(subRDD).filter(!_._2._2.isDefined).map("family[2] : " + _._2._1)
res1RDD.union(res2RDD)
.foreach(println(_))
sc.stop()
}
}
结果 :
family[1] : 24,25,26,27
family[1] : 10,11,13
family[1] : 21,23
family[2] : 31,32,33,34,35,36
family[2] : 1,2,7,8
family[2] : 1,12,7,8
family[2] : 2,5,6,9
family[2] : 1,2,5,6