spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From man june <>
Subject looking for helps in using graphx aggregateMessages
Date Fri, 31 Jul 2015 10:27:09 GMT
Dear list,
Hi~I am new to spark and graphx, and I have a few experiences using scala. I want to use graphx
to calculate some basic statistics in linked open data, which is basically a graph. 
Suppose the graph only contains one type of edge, directing from individuals to concepts,
and the edge labels are all "type". I want to find all pairs of concepts that have at least
one individual linking to both of them.The following is my current solution, but sadly doesn't
Could you please help me work this out? Or are there better solutions? Any helps are appreciated! 

  val conf = new SparkConf().setMaster("spark://MacBook-Pro:7077").setAppName("My App").setJars(...) 
val sc = new SparkContext(conf)// initialize individuals (in small letters) and concepts (in
upper case letters)  val users: RDD[(org.apache.spark.graphx.VertexId, String)] =    sc.parallelize(Array((1L,
"a"), (2L, "b"),      (3L, "e"), (11L, "A"), (12L, "B"), (13L, "C"), (14L, "D")))  //
initialize "type" edges  val relationships: RDD[Edge[String]] =    sc.parallelize(Array( 
    Edge(1L, 11L, "type"), Edge(1L, 14L, "type"), Edge(1L, 13L, "type"),      Edge(2L,
11L, "type"), Edge(2L, 12L, "type"),      Edge(3L, 11L, "type"), Edge(3L, 13L, "type")))  
val graph = Graph(users, relationships)  val indTypes = graph.collectNeighborIds(EdgeDirection.Out)
// seems to be stupid functions...  def mapUDF(triplet: EdgeContext[String, String, HashMap[Long,
Int]]) = {    val msg = indTypes.filter(pred => pred._1 == triplet.srcId).first()._2.aggregate(new
HashMap[Long, Int])((a, b) => a.+=(b -> 1), (a, b) => a ++ b)    triplet.sendToDst(msg) 
}  def reduceUDF(a: HashMap[Long, Int], b: HashMap[Long, Int]): HashMap[Long, Int] = a ++ { case (k, v) => k -> (v + a.getOrElse(k, 0)) }    var pairs = new HashMap[(Long,
Long), Int]  val results = graph.aggregateMessages[HashMap[Long, Int]](    mapUDF, reduceUDF)
  results.foreach(result => {      result._2.filter(p => p._1 != result._1).foreach(map
=> {        val a = result._1        val b = map._1        if (!pairs.contains(a,
b) && !pairs.contains(b, a))          pairs += (a, b) -> map._2      }) 
  })  pairs.foreach(println(_))
The exceptions:
TaskSetManager: Lost task 1.0 in stage 3.0 (TID 7, java.lang.NullPointerExceptionat
atur.node.learner.Test$.mapUDF(SimpleDisjointnessLearner.scala:147)at atur.node.learner.Test$$anonfun$4.apply(SimpleDisjointnessLearner.scala:155)

best wishes,June
View raw message