Shuffle data is not kept in memory. Did you try additional memory configurations( https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi



On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <domen@celtra.com> wrote:
Hi

I have a spark cluster with 4 workers each with 13GB ram. I would like to process a large data set (does not fit in memory) that consists of JSON entries. These are the transformations applied:

SparkContext.textFile(s3url). // read files from s3
keyBy(_.parseJson.id) // key by id that is located in json string
groupByKey(number_of_group_tasks) //group by id
flatMap(case (key,lines) => { //do some stuff })

In the web view I can see a key by operation doing a shuffle write. If I understand correctly the groupByKey transformation creates a wide RDD dependency thus requiring a shuffle write. I have already increased spark.akka.askTimeout to 30 seconds and still job fails with errors on workers:

Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
        at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
        at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30000] milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
        ... 25 more


Before the error I can see this kind of logs:

14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle 0, fetching them

Can you please help me understand what is going on? Is the whole shuffle write RDD kept in memory and when cluster runs out of memory it starts garbage collecting and re fetching from s3?

If this is the case does spark require additional configuration for effective shuffle write to disk?

Regards, Domen