spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "吴 郎" <>
Subject Re: GraphX Pregel not update vertex state properly, cause messages loss
Date Thu, 24 Nov 2016 15:04:20 GMT
Thank you, Dale, I've realized in what situation this bug would be activated. Actually, it
seems that any user-defined class with dynamic fields (such Map, List...) could not be used
as message, or it'll lost in the next supersteps. to figure this out, I tried to deep-copy
an new message object everytime the vertex program runs, and it works till now, though it's
obviously not an elegant way. 

fuz woo
吴   郎


湖南省长沙市开福区 邮编:410073


------------------ Original ------------------
From: "Dale Wang"<>; 
Date: 2016年11月24日(星期四) 中午11:10
To: "吴 郎"<>; 
Cc: "user"<>; 
Subject: Re: GraphX Pregel not update vertex state properly, cause messages loss

The problem comes from the inconsistency between graph’s triplet view  and vertex view.
The message may not be lost but the message is just not  sent in sendMsgfunction because sendMsg
function gets wrong value  of srcAttr! 
 It is not a new bug. I met a similar bug that appeared in version 1.2.1  according to  JIAR-6378
before. I  can reproduce that inconsistency bug with a small and simple program  (See that
JIRA issue for more details). It seems that in some situation  the triplet view of a Graph
object does not update consistently with  vertex view. The GraphX Pregel API heavily relies
on  mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies  on the correct behavior
of the triplet view of a graph. Thus this bug  influences on behavior of Pregel API.
 Though I cannot figure out why the bug appears either, but I suspect  that the bug has some
connection with the data type of the vertex  property. If you use primitive types such as
Double and Long, it is  OK. But if you use some self-defined type with mutable fields such
as  mutable Map and mutable ArrayBuffer, the bug appears. In your case I  notice that you
use JSONObject as your vertex’s data type. After  looking up the definition ofJSONObject,
JSONObject has a java map as  its field to store data which is mutable. To temporarily avoid
the bug,  you can modify the data type of your vertex property to avoid any  mutable data
type by replacing mutable data collection to immutable data  collection provided by Scala
and replacing var field to val field.  At least, that suggestion works for me.
 Zhaokang Wang

2016-11-18 11:47 GMT+08:00 fuz_woo <>:
hi,everyone, I encountered a strange problem these days when i'm attempting
 to use the GraphX Pregel interface to implement a simple
 single-source-shortest-path algorithm.
 below is my code:
 import org.apache.spark.graphx._
 import org.apache.spark.{SparkConf, SparkContext}
 object PregelTest {
   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
 JSONObject] = {
     def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
       if ( msg < 0 ) {
         // init message received
         if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
         else attr.put("LENGTH", Integer.MAX_VALUE)
       } else {
         attr.put("LENGTH", msg+1)
     def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
 Iterator[(VertexId, Integer)] = {
       val len = triplet.srcAttr.getInteger("LENGTH")
       // send a msg if last hub is reachable
       if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
       else Iterator.empty
     def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
       if ( msg1 &lt; msg2 ) msg1 else msg2
     Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
   def main(args: Array[String]): Unit = {
     val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
     conf.set(&quot;spark.master&quot;, &quot;local&quot;)
     val sc = new SparkContext(conf)
     // create a simplest test graph with 3 nodes and 2 edges
     val vertexList = Array(
       (0.asInstanceOf[VertexId], new JSONObject()),
       (1.asInstanceOf[VertexId], new JSONObject()),
       (2.asInstanceOf[VertexId], new JSONObject()))
     val edgeList = Array(
       Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
       Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
     val vertexRdd = sc.parallelize(vertexList)
     val edgeRdd = sc.parallelize(edgeList)
     val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)
     // run test code
     val lpa = run(g)
 and after i run the code, I got a incorrect result in which the vertex 2 has
 a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
 messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
 to file Pregel.scala,  where I saw the code:
 In the first iteration 0, the variable messages in line 138 is reconstructed
 , and then recomputed in line 143, in where activeMessages got a value 0,
 which means the messages is lost.
 then I set a breakpoint in line 138, and before its execution I execute an
 expression " g.triplets().collect() " which just collects the updated graph
 data. after I done this and execute the rest code, the messages is no longer
 empty and activeMessages got value 1 as expected.
 I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
 and got the same result.
 I must say this problem makes me really confused, I've spent almost 2 weeks
 to resolve it and I have no idea how to do it now. If this is not a bug, I
 totally can't understand why just executing a non-disturb expression (
 g.triplets().collect(), it just collect the data and do noting computing )
 could changing the essential, it's really ridiculous.
 View this message in context:
 Sent from the Apache Spark User List mailing list archive at
 To unsubscribe e-mail:
View raw message