spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aurélien Bellet <aurelien.bel...@telecom-paristech.fr>
Subject Re: Memory-efficient successive calls to repartition()
Date Tue, 08 Sep 2015 17:22:36 GMT
Hi,

This is what I tried:

for i in range(1000):
     print i
     data2=data.repartition(50).cache()
     if (i+1) % 10 == 0:
         data2.checkpoint()
     data2.first() # materialize rdd
     data.unpersist() # unpersist previous version
     sc._jvm.System.gc()
     data=data2

But unfortunately I do not get any significant improvement from the call 
to sc._jvm.System.gc()...

I checked the WebUI and I have a single RDD in memory, so unpersist() 
works as expected but still no solution to trigger the cleaning of 
shuffle files...

Aurélien

Le 9/2/15 4:11 PM, alexis GILLAIN a écrit :
> Just made some tests on my laptop.
>
> Deletion of the files is not immediate but a System.gc() call makes the
> job on shuffle files of a checkpointed RDD.
> It should solve your problem (`sc._jvm.System.gc()` in Python as pointed
> in the databricks link in my previous message).
>
>
> 2015-09-02 20:55 GMT+08:00 Aurélien Bellet
> <aurelien.bellet@telecom-paristech.fr
> <mailto:aurelien.bellet@telecom-paristech.fr>>:
>
>     Thanks a lot for the useful link and comments Alexis!
>
>     First of all, the problem occurs without doing anything else in the
>     code (except of course loading my data from HDFS at the beginning) -
>     so it definitely comes from the shuffling. You're right, in the
>     current version, checkpoint files are not removed and take up some
>     space in HDFS (this is easy to fix). But this is negligible compared
>     to the non hdfs files which keeps growing as iterations go. So I
>     agree with you that this must come from the shuffling operations: it
>     seems that the shuffle files are not removed along the execution
>     (they are only removed if I stop/kill the application), despite the
>     use of checkpoint.
>
>     The class you mentioned is very interesting but I did not find a way
>     to use it from pyspark. I will try to implement my own version,
>     looking at the source code. But besides the queueing and removing of
>     checkpoint files, I do not really see anything special there that
>     could solve my issue.
>
>     I will continue to investigate this. Just found out I can use a
>     command line browser to look at the webui (I cannot access the
>     server in graphical display mode), this should help me understand
>     what's going on. I will also try the workarounds mentioned in the
>     link. Keep you posted.
>
>     Again, thanks a lot!
>
>     Best,
>
>     Aurelien
>
>
>     Le 02/09/2015 14:15, alexis GILLAIN a écrit :
>
>         Aurélien,
>
>           From what you're saying, I can think of a couple of things
>         considering
>         I don't know what you are doing in the rest of the code :
>
>         - There is lot of non hdfs writes, it comes from the rest of
>         your code
>         and/or repartittion(). Repartition involve a shuffling and
>         creation of
>         files on disk. I would have said that the problem come from that
>         but I
>         just checked and checkpoint() is supposed to delete shuffle files :
>         https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html
>         (looks exactly as your problem so you could maybe try the others
>         workarounds)
>         Still, you may do a lot of shuffle in the rest of the code (you
>         should
>         see the amount of shuffle files written in the webui) and consider
>         increasing the disk space available...if you can do that.
>
>         - On the hdfs side, the class I pointed to has an update
>         function which
>         "automatically handles persisting and (optionally) checkpointing, as
>         well as unpersisting and removing checkpoint files". Not sure your
>         method for checkpointing remove previous checkpoint file.
>
>         In the end, does the disk space error come from hdfs growing or
>         local
>         disk growing ?
>
>         You should check the webui to identify which tasks spill data on
>         disk
>         and verify if the shuffle files are properly deleted when you
>         checkpoint
>         your rdd.
>
>
>         Regards,
>
>
>         2015-09-01 22:48 GMT+08:00 Aurélien Bellet
>         <aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>
>         <mailto:aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>>>:
>
>
>              Dear Alexis,
>
>              Thanks again for your reply. After reading about
>         checkpointing I
>              have modified my sample code as follows:
>
>              for i in range(1000):
>                   print i
>                   data2=data.repartition(50).cache()
>                   if (i+1) % 10 == 0:
>                       data2.checkpoint()
>                   data2.first() # materialize rdd
>                   data.unpersist() # unpersist previous version
>                   data=data2
>
>              The data is checkpointed every 10 iterations to a directory
>         that I
>              specified. While this seems to improve things a little bit,
>         there is
>              still a lot of writing on disk (appcache directory, shown
>         as "non
>              HDFS files" in Cloudera Manager) *besides* the checkpoint files
>              (which are regular HDFS files), and the application
>         eventually runs
>              out of disk space. The same is true even if I checkpoint at
>         every
>              iteration.
>
>              What am I doing wrong? Maybe some garbage collector setting?
>
>              Thanks a lot for the help,
>
>              Aurelien
>
>              Le 24/08/2015 10:39, alexis GILLAIN a écrit :
>
>                  Hi Aurelien,
>
>                  The first code should create a new RDD in memory at
>         each iteration
>                  (check the webui).
>                  The second code will unpersist the RDD but that's not
>         the main
>                  problem.
>
>                  I think you have trouble due to long lineage as
>         .cache() keep
>                  track of
>                  lineage for recovery.
>                  You should have a look at checkpointing :
>         https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
>         https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala
>
>                  You can also have a look at the code of others iterative
>                  algorithms in
>                  mlllib for best practices.
>
>                  2015-08-20 17:26 GMT+08:00 abellet
>                  <aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>
>
>                  <mailto:aurelien.bellet@telecom-paristech.fr
>         <mailto:aurelien.bellet@telecom-paristech.fr>>>>:
>
>                       Hello,
>
>                       For the need of my application, I need to periodically
>                  "shuffle" the
>                       data
>                       across nodes/partitions of a reasonably-large
>         dataset. This
>                  is an
>                       expensive
>                       operation but I only need to do it every now and then.
>                  However it
>                       seems that
>                       I am doing something wrong because as the
>         iterations go the
>                  memory usage
>                       increases, causing the job to spill onto HDFS, which
>                  eventually gets
>                       full. I
>                       am also getting some "Lost executor" errors that I
>         don't
>                  get if I don't
>                       repartition.
>
>                       Here's a basic piece of code which reproduces the
>         problem:
>
>                       data =
>
>         sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
>                       data.count()
>                       for i in range(1000):
>                                data=data.repartition(50).persist()
>                                # below several operations are done on data
>
>
>                       What am I doing wrong? I tried the following but
>         it doesn't
>                  solve
>                       the issue:
>
>                       for i in range(1000):
>                                data2=data.repartition(50).persist()
>                                data2.count() # materialize rdd
>                                data.unpersist() # unpersist previous version
>                                data=data2
>
>
>                       Help and suggestions on this would be greatly
>         appreciated!
>                  Thanks a lot!
>
>
>
>
>                       --
>                       View this message in context:
>         http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
>                       Sent from the Apache Spark User List mailing list
>         archive
>                  at Nabble.com.
>
>
>
>         ---------------------------------------------------------------------
>                       To unsubscribe, e-mail:
>         user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>
>                  <mailto:user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>>
>                       <mailto:user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>
>                  <mailto:user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>>>
>                       For additional commands, e-mail:
>         user-help@spark.apache.org <mailto:user-help@spark.apache.org>
>                  <mailto:user-help@spark.apache.org
>         <mailto:user-help@spark.apache.org>>
>                       <mailto:user-help@spark.apache.org
>         <mailto:user-help@spark.apache.org>
>                  <mailto:user-help@spark.apache.org
>         <mailto:user-help@spark.apache.org>>>
>
>
>
>
>         ---------------------------------------------------------------------
>              To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>
>              <mailto:user-unsubscribe@spark.apache.org
>         <mailto:user-unsubscribe@spark.apache.org>>
>              For additional commands, e-mail: user-help@spark.apache.org
>         <mailto:user-help@spark.apache.org>
>              <mailto:user-help@spark.apache.org
>         <mailto:user-help@spark.apache.org>>
>
>
>
>     ---------------------------------------------------------------------
>     To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>     <mailto:user-unsubscribe@spark.apache.org>
>     For additional commands, e-mail: user-help@spark.apache.org
>     <mailto:user-help@spark.apache.org>
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message