You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think StorageLevel MEMORY_AND_DISK means spark will try to keep the data in memory and if there isn't sufficient space then it will be shipped to the disk.

Thanks
Best Regards

On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea <octavian.ganea@inf.ethz.ch> wrote:
Hi,

Is there any way to force the output RDD of a  flatMap op to be stored in
both memory and disk as it is computed ? My RAM would not be able to fit the
entire output of flatMap, so it really needs to starts using disk after the
RAM gets full. I didn't find any way to force this.

Also, what is the memory overhead of flatMap ? From my computations, the
output RDD should fit in memory, but I get the following error after a while
(and I know it's because of memory issues, since running the program with
1/3 of the input data finishes succesfully)

15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator:
Could not get block(s) from
ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478)
java.io.IOException: sendMessageReliably failed because ack was not received
within 60 sec
        at
org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866)
        at
org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865)
        at scala.Option.foreach(Option.scala:236)
        at
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865)
        at
io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
        at
io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
        at java.lang.Thread.run(Thread.java:745)


Also, I've seen also this:
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
but my understanding is that one should apply something like:
rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire
output of flatMap is first stored in memory (which is not possible in my
case) and, only when it's done, is stored on the disk. Please correct me if
I'm wrong.  Anways, I've tried using this , but I got the same error.

My config:

    conf.set("spark.cores.max", "128")
    conf.set("spark.akka.frameSize", "1024")
    conf.set("spark.executor.memory", "125g")
    conf.set("spark.shuffle.file.buffer.kb", "1000")
    conf.set("spark.shuffle.consolidateFiles", "true")



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org