1.目的
找出人际关系圈中的三角环状关系(A->B,B->C,C->A)
2.素材
text1.txt
1 tom 2 jack friend2 jack 3 sala friend3 sala 1 tom friend4 joy 1 tom friend1 tom 4 joy friend1 tom 4 joy friend2 jack 5 Missing friend
3.代码
package testimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.graphx._import org.apache.spark.rdd.RDDobject join { def main(args: Array[String]) { //屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //设置运行环境 val conf = new SparkConf().setAppName("test1").setMaster("local") val sc = new SparkContext(conf) //引入text文件 val File:RDD[String] = sc.textFile("e:\\text1.txt") //创建两个列反向的RDD //aid1,aid2 val edgea = File.map { e => val fields = e.split(" ") (fields(0).toLong,fields(2).toLong) } //bid1,bid2 val edged = File.map { e => val fields = e.split(" ") (fields(2).toLong,fields(0).toLong) } //RDD一次关联 val edge_rel=edgea.join(edged).map{case (aid1,(aid2,did2))=>(did2,(aid1,aid2))} println("edge_rel:") println(edge_rel.collect.mkString("\n")) //RDD二次关联 val edge_final=edge_rel.join(edged).map{case (aid1,((aid2,aid3),aid4))=>(aid4,aid1,aid2,aid3)} println("edge_final:") println(edge_final.collect.mkString("\n")) //筛选有向环 val edge_match=edge_final.filter{case (id1,id2,id3,id4)=>id1==id4} println("edge_match:") println(edge_match.collect.mkString("\n")) //最终环状结构 val edge_tri=edge_match.map{case (id1,id2,id3,id4)=>(id1,id2,id3)} println("edge_tri:") println(edge_tri.collect.mkString("\n")) //环状结构去重 val edge_unique=edge_tri.filter{case (id1,id2,id3)=>id1
4.输出
edge_rel:(1,(4,1))(1,(4,1))(3,(1,2))(4,(1,2))(3,(1,4))(4,(1,4))(3,(1,4))(4,(1,4))(2,(3,1))(1,(2,3))(1,(2,5))edge_final:(1,4,1,2)(1,4,1,2)(1,4,1,4)(1,4,1,4)(1,4,1,4)(1,4,1,4)(3,1,4,1)(4,1,4,1)(3,1,4,1)(4,1,4,1)(3,1,2,3)(4,1,2,3)(3,1,2,5)(4,1,2,5)(2,3,1,2)(2,3,1,4)(2,3,1,4)(1,2,3,1)edge_match:(3,1,2,3)(2,3,1,2)(1,2,3,1)edge_tri:(3,1,2)(2,3,1)(1,2,3)edge_unique:(1,2,3)