spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kyle Ellrott <>
Subject Re: SPARK-942
Date Wed, 06 Nov 2013 18:58:55 GMT
I think the usage has to be calculated as the iterator is being put into
the arraybuffer.
Right now, the BlockManager, in it's put method when it gets an iterator
named 'values' uses the simple stanza of:

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
tellMaster: Boolean)
    : Long = {
    val elements = new ArrayBuffer[Any]
    elements ++= values
    put(blockId, elements, level, tellMaster)

Completely unrolling the iterator in a single line.  Above it, the
CacheManager does the exact same thing with:

val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)

We would probably have to implement some sort of 'IteratorBuffer' class,
which would wrap an iterator. It would include a method to unroll an
iterator into a buffer up to a point, something like

def unroll(maxMem:Long) : Boolean ={ ...}

And it would return True if the maxMem was hit. At which point BlockManager
could read through the already cached values, then continue on through the
rest of the iterators dumping all the values to file. If it unrolled
without hitting maxMem (which would probably be most of the time), the
class would simply wrap the ArrayBuffer of cached values.


On Sun, Nov 3, 2013 at 12:50 AM, Reynold Xin <> wrote:

> It's not a very elegant solution, but one possibility is for the
> CacheManager to check whether it will have enough space. If it is running
> out of space, skips buffering the output of the iterator & directly write
> the output of the iterator to disk (if storage level allows that).
> But it is still tricky to know whether we will run out of space before we
> even start running the iterator. One possibility is to use sizing data from
> previous partitions to estimate the size of the current partition (i.e.
> estimated in memory size = avg of current in-memory size / current input
> size).
> Do you have any ideas on this one, Kyle?
> On Sat, Oct 26, 2013 at 10:53 AM, Kyle Ellrott <
> >wrote:
> > I was wondering if anybody had any thoughts on the best way to tackle
> > SPARK-942 ( ).
> > Basically, Spark takes an iterator from a flatmap call and because I tell
> > it that it needs to persist Spark proceeds to push it all into an array
> > before deciding that it doesn't have enough memory and trying to
> serialize
> > it to disk, and somewhere along the line it runs out of memory. For my
> > particular operation, the function return an iterator that reads data out
> > of a file, and the size of the files passed to that function can vary
> > greatly (from a few kilobytes to a few gigabytes). The funny thing is
> that
> > if I do a strait 'map' operation after the flat map, everything works,
> > because Spark just passes the iterator forward and never tries to expand
> > the whole thing into memory. But I need do a reduceByKey across all the
> > records, so I'd like to persist to disk first, and that is where I hit
> this
> > snag.
> > I've already setup a unit test to replicate the problem, and I know the
> > area of the code that would need to be fixed.
> > I'm just hoping for some tips on the best way to fix the problem.
> >
> > Kyle
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message