spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aurélien Bellet <aurelien.bel...@telecom-paristech.fr>
Subject Re: Memory-efficient successive calls to repartition()
Date Tue, 01 Sep 2015 14:48:43 GMT
Dear Alexis,

Thanks again for your reply. After reading about checkpointing I have 
modified my sample code as follows:

for i in range(1000):
     print i
     data2=data.repartition(50).cache()
     if (i+1) % 10 == 0:
         data2.checkpoint()
     data2.first() # materialize rdd
     data.unpersist() # unpersist previous version
     data=data2

The data is checkpointed every 10 iterations to a directory that I 
specified. While this seems to improve things a little bit, there is 
still a lot of writing on disk (appcache directory, shown as "non HDFS 
files" in Cloudera Manager) *besides* the checkpoint files (which are 
regular HDFS files), and the application eventually runs out of disk 
space. The same is true even if I checkpoint at every iteration.

What am I doing wrong? Maybe some garbage collector setting?

Thanks a lot for the help,

Aurelien

Le 24/08/2015 10:39, alexis GILLAIN a écrit :
> Hi Aurelien,
>
> The first code should create a new RDD in memory at each iteration
> (check the webui).
> The second code will unpersist the RDD but that's not the main problem.
>
> I think you have trouble due to long lineage as .cache() keep track of
> lineage for recovery.
> You should have a look at checkpointing :
> https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>
> You can also have a look at the code of others iterative algorithms in
> mlllib for best practices.
>
> 2015-08-20 17:26 GMT+08:00 abellet <aurelien.bellet@telecom-paristech.fr
> <mailto:aurelien.bellet@telecom-paristech.fr>>:
>
>     Hello,
>
>     For the need of my application, I need to periodically "shuffle" the
>     data
>     across nodes/partitions of a reasonably-large dataset. This is an
>     expensive
>     operation but I only need to do it every now and then. However it
>     seems that
>     I am doing something wrong because as the iterations go the memory usage
>     increases, causing the job to spill onto HDFS, which eventually gets
>     full. I
>     am also getting some "Lost executor" errors that I don't get if I don't
>     repartition.
>
>     Here's a basic piece of code which reproduces the problem:
>
>     data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>     data.count()
>     for i in range(1000):
>              data=data.repartition(50).persist()
>              # below several operations are done on data
>
>
>     What am I doing wrong? I tried the following but it doesn't solve
>     the issue:
>
>     for i in range(1000):
>              data2=data.repartition(50).persist()
>              data2.count() # materialize rdd
>              data.unpersist() # unpersist previous version
>              data=data2
>
>
>     Help and suggestions on this would be greatly appreciated! Thanks a lot!
>
>
>
>
>     --
>     View this message in context:
>     http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>     Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <mailto:user-unsubscribe@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <mailto:user-help@spark.apache.org>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message