spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robin East <robin.e...@xense.co.uk>
Subject Re: spark graphx storage RDD memory leak
Date Mon, 11 Apr 2016 18:13:10 GMT
this looks like https://issues.apache.org/jira/browse/SPARK-12655 <https://issues.apache.org/jira/browse/SPARK-12655>
fixed in 2.0
-------------------------------------------------------------------------------
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action <http://www.manning.com/books/spark-graphx-in-action>





> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang.cn@gmail.com> wrote:
> 
> thanks ted for replying ,
> these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid,
vdata) => vprog(vid, vdata, initialMsg)).cache() )
> ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in
Pregel
>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
>     val ccGraph = graph.mapVertices { case (vid, _) => vid }
>     def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)]
= {
>       if (edge.srcAttr < edge.dstAttr) {
>         Iterator((edge.dstId, edge.srcAttr))
>       } else if (edge.srcAttr > edge.dstAttr) {
>         Iterator((edge.srcId, edge.dstAttr))
>       } else {
>         Iterator.empty
>       }
>     }
>     val initialMessage = Long.MaxValue
>     Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
>       vprog = (id, attr, msg) => math.min(attr, msg),
>       sendMsg = sendMessage,
>       mergeMsg = (a, b) => math.min(a, b))
>   } // end of connectedComponents
> }
> thanks
> juntao
> 
> 
>> Begin forwarded message:
>> 
>> From: Ted Yu <yuzhihong@gmail.com <mailto:yuzhihong@gmail.com>>
>> Subject: Re: spark graphx storage RDD memory leak
>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>> To: zhang juntao <juntao.zhang.cn@gmail.com <mailto:juntao.zhang.cn@gmail.com>>
>> Cc: "dev@spark.apache.org <mailto:dev@spark.apache.org>" <dev@spark.apache.org
<mailto:dev@spark.apache.org>>
>> 
>> I see the following code toward the end of the method:
>> 
>>       // Unpersist the RDDs hidden by newly-materialized RDDs
>>       oldMessages.unpersist(blocking = false)
>>       prevG.unpersistVertices(blocking = false)
>>       prevG.edges.unpersist(blocking = false)
>> 
>> Wouldn't the above achieve same effect ?
>> 
>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang.cn@gmail.com <mailto:juntao.zhang.cn@gmail.com>>
wrote:
>> hi experts,
>> 
>> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs, 
>> note that scala environment shares the same SparkContext, SQLContext instance,
>> and I call  Connected components algorithm to do some Business,  
>> found that every time when the job finished, some graph storage RDDs weren’t bean
released, 
>> after several times there would be a lot of  storage RDDs existing even through all
the jobs have finished . 
>> 
>> <PastedGraphic-1.png>
>> 
>> So I check the code of connectedComponents  and find that may be a problem in Pregel.scala
.
>> when param graph has been cached, there isn’t any way to unpersist,  
>> so I add red font code to solve the problem
>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>>    (graph: Graph[VD, ED],
>>     initialMsg: A,
>>     maxIterations: Int = Int.MaxValue,
>>     activeDirection: EdgeDirection = EdgeDirection.Either)
>>    (vprog: (VertexId, VD, A) => VD,
>>     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>>     mergeMsg: (A, A) => A)
>>   : Graph[VD, ED] =
>> {
>>   ......
>>   var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
>>   graph.unpersistVertices(blocking = false)
>>   graph.edges.unpersist(blocking = false)
>>   ......
>> 
>> } // end of apply
>> 
>> I'm not sure if this is a bug, 
>> and thank you for your time,
>> juntao
>> 
>> 
>> 
> 


Mime
View raw message