spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: java.lang.StackOverflowError when calling count()
Date Sun, 27 Jul 2014 03:03:07 GMT
Responses inline.

On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 <lalit@sigmoidanalytics.com> wrote:
> Hi,
> Thanks TD for your reply. I am still not able to resolve the problem for my
> use case.
> I have let's say 1000 different RDD's, and I am applying a transformation
> function on each RDD and I want the output of all rdd's combined to a single
> output RDD. For, this I am doing the following:
>
> *<Loop Start>*
> tempRDD = jaRDD.rdd().repartition(1).mapPartitions(....).toJavaRDD();
> *//creating new rdd in every loop*
> outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into
> a single RDD*
>
> *//after every 10 iteration, in order to truncate the lineage*
> cachRDD = outRDD.cache();
> cachRDD.checkpoint();
> System.out.println(cachRDD.collect().size());
> outRDD = new JavaRDD<String>(cachRDD.rdd(),cachRDD.classTag());
> *<Loop Ends>*
>
> *//finally after whole computation*
> outRDD.saveAsTextFile(..)
>
> The above operations is overall slow, runs successfully when performed less
> iterations i.e. ~100. But, when the num of iterations in increased to ~1000,
> The whole job is taking more than *30 mins* and ultimately break down giving
> OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am
> running the job on spark standalone mode with 2 cores and 2.9 GB memory.

I think this is happening because how you are caching the output RDD
that are being generated repeatedly. In every iteration, it is
building this new union RDD which contains the data of the previous
union RDD plus some new data. Since each of these union RDDs are
cached, the underlying data is being cached repeatedly. So the cached
Iteration 1: union RDD: X MB
Iteration 2: union RDD: 2X MB   |  Total size cached: 3X
Iteration 3: union RDD: 3X MB   |  Total size cached: 6X MB
Iteration 4: union RDD: 4X MB   |  Total size cached: 10X MB
...

If you do the math, that is a quadratic increase in the size of the
data being processed and cached, wrt the # iterations. This leads to
both increase in run time and memory usage.


> I also observed that when collect() operation is performed, the number of
> tasks keeps on increasing as the loop proceeds, like on first collect() 22
> total task, then ~40 total tasks ... ~300 task for single collect.
> Does this means that all the operations are repeatedly performed, and RDD
> lineage is not broken??
>
Same reason as above. Each union RDD is build by appending the
partitions of the previous union RDD plus the new set of partitions
(~22 partitions). So Nth union RDD has N * 22 partitions, hence that
many tasks.
You could change this by also doing repartitioning when you want to
cache+checkpoint the union RDD (therefore,
outRDD.repartition(100).cache().checkpoint().count()).

And do you really need all the data to be collected at the driver? If
you are doing the cachRDD.collect() just to forced the checkpoint,
then use cachRDD.count()

>
> Can you please elaborate on the point from your last post i.e. how to
> perform: "*Create a modified RDD R` which has the same data as RDD R but
> does not have the lineage. This is done by creating a new BlockRDD using the
> ids of blocks of data representing the in-memory R*"
>
Please refer to the lines in the function:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74
What those lines do is save the data of the associated RDD to HDFS
files, and then create a new CheckpointRDD from the same files.Then
the dependency of the associated RDD is changed to use the new RDD.
This truncates the lineage because the associated RDD's parent is not
the new RDD which has a very short lineage (links to checkpoint
files). And the previous dependencies (parent RDDs) are forgotten.

This implementation can be modified by forcing the data of the
associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And
then instead of CheckpointRDD, you can create a new BlockRDD (using
the names of the blocks that are used to cache the RDD), which is then
set as the new dependency. This is definitely a behind-the-public API
implementation, that is


>
>
> -----
> Lalit Yadav
> lalit@sigmoidanalytics.com
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Mime
View raw message