spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Takeshi Yamamuro <linguin....@gmail.com>
Subject Re: [GraphX] Excessive value recalculations during aggregateMessages cycles
Date Sun, 15 Feb 2015 10:04:26 GMT
Hi,

I tried quick and simple tests though, ISTM the vertices below were
correctly cached.
Could you give me the differences between my codes and yours?

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

object Prog {
  def processInt(d: Int) = d * 2
}

val g = GraphLoader.edgeListFile(sc, "../temp/graph.txt")
        .cache

val g2 = g.outerJoinVertices(g.degrees)(
          (vid, old, msg) => Prog.processInt(msg.getOrElse(0)))
        .cache

g2.vertices.count

val g3 = g.outerJoinVertices(g.degrees)(
          (vid, old, msg) => msg.getOrElse(0))
        .mapVertices((vid, d) => Prog.processInt(d))
        .cache

g3.vertices.count

'g2.vertices.toDebugString' outputs;
----
(2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 []
 |  VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at
VertexRDDImpl.scala:121 []
 |      CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |      CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 []
    |  GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD,
EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph...


'g3.vertices.toDebugString' outputs;
----
(2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 []
 |  VertexRDD MapPartitionsRDD[32] at mapPartitions at
VertexRDDImpl.scala:96 []
 |      CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at
VertexRDDImpl.scala:121 []
 |      CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |      CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPar...

-- maropu

On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott <kellrott@soe.ucsc.edu> wrote:

> I changed the
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
>           (vid, vertex, message) =>
> vertex.process(message.getOrElse(List[Message]()), ti)
>         ).cache()
>
> to
>
> curGraph = curGraph.outerJoinVertices(curMessages)(
>           (vid, vertex, message) => (vertex,
> message.getOrElse(List[Message]()))
>         ).mapVertices( (x,y) => y._1.process( y._2, ti ) ).cache()
>
> So the call to the 'process' method was moved out of the outerJoinVertices
> and into a separate mapVertices call, and the problem went away. Now,
> 'process' is only called once during the correct cycle.
> So it would appear that outerJoinVertices caches the closure to be
> recalculated if needed again while mapVertices actually caches the
> derived values.
>
> Is this a bug or a feature?
>
> Kyle
>
>
>
> On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott <kellrott@soe.ucsc.edu>
> wrote:
>
>> I'm trying to setup a simple iterative message/update problem in GraphX
>> (spark 1.2.0), but I'm running into issues with the caching and
>> re-calculation of data. I'm trying to follow the example found in the
>> Pregel implementation of materializing and cacheing messages and graphs and
>> then unpersisting them after the next cycle has been done.
>> It doesn't seem to be working, because every cycle gets progressively
>> slower and it seems as if more and more of the values are being
>> re-calculated despite my attempts to cache them.
>>
>> The code:
>> ```
>>       var oldMessages : VertexRDD[List[Message]] = null
>>       var oldGraph : Graph[MyVertex, MyEdge ] = null
>>       curGraph = curGraph.mapVertices((x, y) => y.init())
>>       for (i <- 0 to cycle_count) {
>>         val curMessages = curGraph.aggregateMessages[List[Message]](x => {
>>           //send messages
>>           .....
>>         },
>>         (x, y) => {
>>            //collect messages into lists
>>             val out = x ++ y
>>             out
>>           }
>>         ).cache()
>>         curMessages.count()
>>         val ti = i
>>         oldGraph = curGraph
>>         curGraph = curGraph.outerJoinVertices(curMessages)(
>>           (vid, vertex, message) =>
>> vertex.process(message.getOrElse(List[Message]()), ti)
>>         ).cache()
>>         curGraph.vertices.count()
>>         oldGraph.unpersistVertices(blocking = false)
>>         oldGraph.edges.unpersist(blocking = false)
>>         oldGraph = curGraph
>>         if (oldMessages != null ) {
>>           oldMessages.unpersist(blocking=false)
>>         }
>>         oldMessages = curMessages
>>       }
>> ```
>>
>> The MyVertex.process method takes the list of incoming messages, averages
>> them and returns a new MyVertex object. I've also set it up to append the
>> cycle number (the second argument) into a log file named after the vertex.
>> What ends up getting dumped into the log file for every vertex (in the
>> exact same pattern) is
>> ```
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 2
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 3
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 4
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 0
>> Cycle: 0
>> Cycle: 1
>> Cycle: 2
>> Cycle: 3
>> Cycle: 5
>> ```
>>
>> Any ideas about what I might be doing wrong for the caching? And how I
>> can avoid re-calculating so many of the values.
>>
>>
>> Kyle
>>
>>
>>
>


-- 
---
Takeshi Yamamuro

Mime
View raw message