spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grega Kespret <gr...@celtra.com>
Subject Re: Out of memory on large RDDs
Date Tue, 11 Mar 2014 22:07:38 GMT
> Your input data read as RDD may be causing OOM, so thats where you can use different memory
configuration. 

We are not getting any OOM exceptions, just akka future timeouts in mapoutputtracker and unsuccessful
get of shuffle outputs, therefore refetching them. 

What is the industry practice when going about debugging such errors? 

Questions:
- why are mapoutputtrackers timing out? ( and how to debug this properly?)
- what is the task/purpose of mapoutputtracker?
- how to check per-task objects size?

Thanks,
Grega

> On 11 Mar 2014, at 18:43, Mayur Rustagi <mayur.rustagi@gmail.com> wrote:
> 
> Shuffle data is always stored on disk, its unlikely to cause OOM. Your input data read
as RDD may be causing OOM, so thats where you can use different memory configuration. 
> 
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
> 
> 
> 
>> On Tue, Mar 11, 2014 at 9:20 AM, sparrow <domen@celtra.com> wrote:
>> I don't understand how exactly will that help. There are no persisted RDD's in storage.
Our input data is ~ 100GB, but output of the flatMap is ~40Mb. The small RDD is then persisted.

>> 
>> Memory configuration should not affect shuffle data if I understand you correctly?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Mar 11, 2014 at 4:52 PM, Mayur Rustagi [via Apache Spark User List] <[hidden
email]> wrote:
>>> Shuffle data is not kept in memory. Did you try additional memory configurations(
https://spark.incubator.apache.org/docs/latest/scala-programming-guide.html#rdd-persistence)

>>> 
>>> Mayur Rustagi
>>> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1
(760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi
>>> 
>>> 
>>> 
>>>> On Tue, Mar 11, 2014 at 8:35 AM, Domen Grabec <[hidden email]> wrote:
>>>> Hi
>>>> 
>>>> I have a spark cluster with 4 workers each with 13GB ram. I would like to
process a large data set (does not fit in memory) that consists of JSON entries. These are
the transformations applied:
>>>> 
>>>> SparkContext.textFile(s3url). // read files from s3
>>>> keyBy(_.parseJson.id) // key by id that is located in json string
>>>> groupByKey(number_of_group_tasks) //group by id
>>>> flatMap(case (key,lines) => { //do some stuff })
>>>> 
>>>> In the web view I can see a key by operation doing a shuffle write. If I
understand correctly the groupByKey transformation creates a wide RDD dependency thus requiring
a shuffle write. I have already increased spark.akka.askTimeout to 30 seconds and still job
fails with errors on workers:
>>>> 
>>>> Error communicating with MapOutputTracker
>>>>         at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:84)
>>>>         at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:170)
>>>>         at org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
>>>>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:59)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
>>>>         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
>>>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
>>>>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:71)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:224)
>>>>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:47)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:46)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>>>>         at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:46)
>>>>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>>>>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:724)
>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30000] milliseconds
>>>>         at akka.dispatch.DefaultPromise.ready(Future.scala:870)
>>>>         at akka.dispatch.DefaultPromise.result(Future.scala:874)
>>>>         at akka.dispatch.Await$.result(Future.scala:74)
>>>>         at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:81)
>>>>         ... 25 more
>>>> 
>>>> 
>>>> Before the error I can see this kind of logs:
>>>> 
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle
0, fetching them
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle
0, fetching them
>>>> 14/03/11 14:29:40 INFO MapOutputTracker: Don't have map outputs for shuffle
0, fetching them
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Can you please help me understand what is going on? Is the whole shuffle
write RDD kept in memory and when cluster runs out of memory it starts garbage collecting
and re fetching from s3? 
>>>> 
>>>> If this is the case does spark require additional configuration for effective
shuffle write to disk?
>>>> 
>>>> Regards, Domen
>>> 
>>> 
>>> 
>>> If you reply to this email, your message will be added to the discussion below:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Out-of-memory-on-large-RDDs-tp2533p2534.html
>>> To start a new topic under Apache Spark User List, email [hidden email] 
>>> To unsubscribe from Apache Spark User List, click here.
>>> NAML
>> 
>> 
>> View this message in context: Re: Out of memory on large RDDs
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Mime
View raw message