spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "" <>
Subject Re: OOM with groupBy + saveAsTextFile
Date Sun, 02 Nov 2014 04:52:33 GMT


FYI as follows.  Could you post your heap size settings as well your Spark app code?


3.1.3 Detail Message: Requested array size exceeds VM limit

The detail message Requested array size exceeds VM limit indicates that the application (or
APIs used by that application) attempted to allocate an array that is larger than the heap
size. For example, if an application attempts to allocate an array of 512MB but the maximum
heap size is 256MB then OutOfMemoryError will be thrown with the reason Requested array size
exceeds VM limit. In most cases the problem is either a configuration issue (heap size too
small), or a bug that results in an application attempting to create a huge array, for example,
when the number of elements in the array are computed using an algorithm that computes an
incorrect size.”

On 2 Nov, 2014, at 12:25 pm, Bharath Ravi Kumar <> wrote:

> Resurfacing the thread. Oom shouldn't be the norm for a common groupby / sort use case
in a framework that is leading in sorting bench marks? Or is there something fundamentally
wrong in the usage?
> On 02-Nov-2014 1:06 am, "Bharath Ravi Kumar" <> wrote:
> Hi,
> I'm trying to run groupBy(function) followed by saveAsTextFile on an RDD of count ~ 100
million. The data size is 20GB and groupBy results in an RDD of 1061 keys with values being
Iterable<Tuple4<String, Integer, Double, String>>. The job runs on 3 hosts in
a standalone setup with each host's executor having 100G RAM and 24 cores dedicated to it.
While the groupBy stage completes successfully with ~24GB of shuffle write, the saveAsTextFile
fails after repeated retries with each attempt failing due to an out of memory error [1].
I understand that a few partitions may be overloaded as a result of the groupBy and I've tried
the following config combinations unsuccessfully:
> 1) Repartition the initial rdd (44 input partitions but 1061 keys) across 1061 paritions
and have max cores = 3 so that each key is a "logical" partition (though many partitions will
end up on very few hosts), and each host likely runs saveAsTextFile on a single key at a time
due to max cores = 3 with 3 hosts in the cluster. The level of parallelism is unspecified.
> 2) Leave max cores unspecified, set the level of parallelism to 72, and leave number
of partitions unspecified (in which case the # input partitions was used, which is 44)
> Since I do not intend to cache RDD's, I have set in
both cases.
> My understanding is that if each host is processing a single logical partition to saveAsTextFile
and is reading from other hosts to write out the RDD, it is unlikely that it would run out
of memory. My interpretation of the spark tuning guide is that the degree of parallelism has
little impact in case (1) above since max cores = number of hosts. Can someone explain why
there are still OOM's with 100G being available? On a related note, intuitively (though I
haven't read the source), it appears that an entire key-value pair needn't fit into memory
of a single host for saveAsTextFile since a single shuffle read from a remote can be written
to HDFS before the next remote read is carried out. This way, not all data needs to be collected
at the same time. 
> Lastly, if an OOM is (but shouldn't be) a common occurrence (as per the tuning guide
and even as per Datastax's spark introduction), there may need to be more documentation around
the internals of spark to help users take better informed tuning decisions with parallelism,
max cores, number partitions and other tunables. Is there any ongoing effort on that front?
> Thanks,
> Bharath
> [1] OOM stack trace and logs
> 14/11/01 12:26:37 WARN TaskSetManager: Lost task 61.0 in stage 36.0 (TID 1264,
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
>         java.util.Arrays.copyOf(
>         java.lang.AbstractStringBuilder.expandCapacity(
>         java.lang.AbstractStringBuilder.ensureCapacityInternal(
>         java.lang.AbstractStringBuilder.append(
>         java.lang.StringBuilder.append(
>         scala.collection.mutable.StringBuilder.append(StringBuilder.scala:197)
>         scala.Tuple2.toString(Tuple2.scala:22)
>         org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>         org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158)
>         scala.collection.Iterator$$anon$
>         org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
>         org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.executor.Executor$
>         java.util.concurrent.ThreadPoolExecutor.runWorker(
>         java.util.concurrent.ThreadPoolExecutor$
> 14/11/01 12:26:40 WARN TaskSetManager: Lost task 92.0 in stage 36.0 (TID 1295,
FetchFailed(BlockManagerId(1,, 43704, 0), shuffleId=0, mapId=13, reduceId=92)
> 14/11/01 12:26:40 INFO DAGScheduler: Marking Stage 36 (saveAsTextFile at
as failed due to a fetch failure from Stage 37 (groupBy at
> 14/11/01 12:26:40 INFO DAGScheduler: Stage 36 (saveAsTextFile at
failed in 55.259 s
> 14/11/01 12:26:40 INFO DAGScheduler: Resubmitting Stage 37 (groupBy at
and Stage 36 (saveAsTextFile at due to fetch failure

View raw message