Shuffle data is always stored on disk, its unlikely to cause OOM. Your input data read as RDD may be causing OOM, so thats where you can use different memory configuration. 

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi



On Tue, Mar 11, 2014 at 9:20 AM, sparrow <domen@celtra.com> wrote:
I don't understand how exactly will that help. There are no persisted RDD's in storage. Our input data is ~ 100GB, but output of the flatMap is ~40Mb. The small RDD is then persisted. 

Memory configuration should not affect shuffle data if I understand you correctly?




On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] <[hidden email]> wrote:
Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence

Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]> wrote:
Hi

I have a spark cluster with 4 workers each with 13GB ram. I would like to process a large data set (does not fit in memory) that consists of JSON entries. These are the transformations applied:

SparkContext.textFile(s3url). // read files from s3
keyBy(_.parseJson.id) // key by id that is located in json string
groupByKey(number_of_group_tasks) //group by id
flatMap(case (key,lines) => { //do some stuff })

In the web view I can see a key by operation doing a shuffle write. If I understand correctly the groupByKey transformation creates a wide RDD dependency thus requiring a shuffle write. I have already increased spark.akka.askTimeout to 30 seconds and still job fails with errors on workers:

Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
        at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30000] milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
        ... 25 more


Before the error I can see this kind of logs:

14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them

Can you please help me understand what is going on? Is the whole shuffle write RDD kept in memory and when cluster runs out of memory it starts garbage collecting and re fetching from s3?

If this is the case does spark require additional configuration for effective shuffle write to disk?

Regards, Domen




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
To start a new topic under Apache Spark User List, email [hidden email]
To unsubscribe from Apache Spark User List, click here.
NAML



View this message in context: Re: Out of memory on large RDDs
Sent from the Apache Spark User List mailing list archive at Nabble.com.