spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Rosen <rosenvi...@gmail.com>
Subject Re: Out of memory building RDD on local[N]
Date Sat, 02 Nov 2013 01:19:51 GMT
I think that parallelize() keeps its list in the driver to provide
resiliency for the RDDs that it creates: Spark doesn't know the lineage
that was used to create the items passed to parallelize(), so it needs to
keep a copy of those items in the driver to allow the RDD's blocks to be
recomputed.

If your batches/RDDs can be generated independently, you could
parallelize() a small set of offsets (or filenames, or random number
generator seeds, or something similar), then generate the elements inside
of a flatMap() transformation on that RDD; that would look something like
this:

val offests = Seq(1000, 2000, 3000)
sc.parallelize(offsets, offests.size).flatMap{ offset =>
   val dataGenerator = new SomeDataGenerator(offset)
   dataGenerator.generateData()
}


An alternative would be to add a parallelize() method that accepts a
deterministic function that generates a sequence of items, allowing the
driver to only retain that function instead of the items that it produces.
 However, this wouldn't allow you to benefit from parallelism when loading
or generating your data.


On Fri, Nov 1, 2013 at 5:43 PM, Matt Cheah <mcheah@palantir.com> wrote:

>  So I'm doing some more reading of the ParallelCollectionRDD code.
>
>  It stores the data as a plain old list. Does this also mean that the
> data backing ParallelCollectionRDDs are always stored on the driver's heap
> as opposed to being distributed to the cluster? Can this behavior be
> overridden without explicitly saving the RDD to disk?
>
>  -Matt Cheah
>
>   From: Andrew Winings <mcheah@palantir.com>
> Date: Friday, November 1, 2013 3:51 PM
> To: "user@spark.incubator.apache.org" <user@spark.incubator.apache.org>
> Cc: Mingyu Kim <mkim@palantir.com>
> Subject: Out of memory building RDD on local[N]
>
>   Hi everyone,
>
>  I'm trying to run a task where I accumulate a ~1.5GB RDD with the spark
> url as local[8]. No matter what I do to this RDD, telling it to persist
> with StorageLevel.DISK_ONLY, or un-persisting the RDD altogether, it's
> always causing the JVM to run out of heap space.
>
>  I'm building the RDD in "batches". I'm building up a Java collection of
> 500,000 items, then using context.parallelize() on that collection; call
> this RDD currBatchRDD. Then, I perform an RDD.union on the previous-batch
> RDD (prevBatchRDD) and the parallelized collection. Then I set prevBatchRDD
> to this union result, and so on. I clear this Java collection, and continue
> from there.
>
>  I would expect that, both locally and with an actual Spark cluster, that
> StorageLevel configurations would be respected for keeping RDDs on-heap or
> off-heap. However, my memory profile shows that the entire RDD is being
> collected on-heap in the local case. Am I misunderstanding the
> documentation?
>
>  Thanks,
>
>  -Matt Cheah
>

Mime
View raw message