spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mayur Rustagi <mayur.rust...@gmail.com>
Subject Re: Out of memory on large RDDs
Date Tue, 11 Mar 2014 15:50:45 GMT
Shuffle data is not kept in memory. Did you try additional memory
configurations(
https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence
)

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <domen@celtra.com> wrote:

> Hi
>
> I have a spark cluster with 4 workers each with 13GB ram. I would like to
> process a large data set (does not fit in memory) that consists of JSON
> entries. These are the transformations applied:
>
> SparkContext.textFile(s3url). // read files from s3
> keyBy(_.parseJson.id) // key by id that is located in json string
> groupByKey(number_of_group_tasks) //group by id
> flatMap(case (key,lines) => { //do some stuff })
>
> In the web view I can see a key by operation doing a shuffle write. If I
> understand correctly the groupByKey transformation creates a wide RDD
> dependency thus requiring a shuffle write. I have already increased
> spark.akka.askTimeout to 30 seconds and still job fails with errors on
> workers:
>
> Error communicating with MapOutputTracker
>         at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>         at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>         at
> org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>         at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>         at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:724)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [30000] milliseconds
>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>         at akka.dispatch.Await$.result(Future.scala:74)
>         at
> org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>         ... 25 more
>
>
> Before the error I can see this kind of logs:
>
> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for
> shuffle 0, fetching them 14/03/11 14:29:40 INFO MapOutputTracker: Don't
> have map outputs for shuffle 0, fetching them 14/03/11 14:29:40 INFO
> MapOutputTracker: Don't have map outputs for shuffle 0, fetching them
>
> Can you please help me understand what is going on? Is the whole shuffle
> write RDD kept in memory and when cluster runs out of memory it starts
> garbage collecting and re fetching from s3?
>
> If this is the case does spark require additional configuration for
> effective shuffle write to disk?
>
> Regards, Domen
>

Mime
View raw message