spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Domen Grabec <do...@celtra.com>
Subject Re: Out of memory on large RDDs
Date Tue, 11 Mar 2014 15:35:43 GMT
Hi

I have a spark cluster with 4 workers each with 13GB ram. I would like to
process a large data set (does not fit in memory) that consists of JSON
entries. These are the transformations applied:

SparkContext.textFile(s3url). // read files from s3
keyBy(_.parseJson.id) // key by id that is located in json string
groupByKey(number_of_group_tasks) //group by id
flatMap(case (key,lines) => { //do some stuff })

In the web view I can see a key by operation doing a shuffle write. If I
understand correctly the groupByKey transformation creates a wide RDD
dependency thus requiring a shuffle write. I have already increased
spark.akka.askTimeout to 30 seconds and still job fails with errors on
workers:

Error communicating with MapOutputTracker
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
        at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30000] milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
        ... 25 more


Before the error I can see this kind of logs:

14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle
0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map
outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
MapOutputTracker: Don't have map outputs for shuffle 0, fetching them

Can you please help me understand what is going on? Is the whole shuffle
write RDD kept in memory and when cluster runs out of memory it starts
garbage collecting and re fetching from s3?

If this is the case does spark require additional configuration for
effective shuffle write to disk?

Regards, Domen

Mime
View raw message