spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: Rolling window cache
Date Tue, 27 Aug 2013 22:58:32 GMT
Hi Grega,

> thanks for the response, I haven't noticed it until now. 
> 1. Is there some performance penalty if we union many small RDDs? 
> 
> I will need to union 7 RDDs continuously in a loop, where 6 RDDs will be common from
1 iteration to the next. Something along the lines:
> 
> val rdds = ... // IndexedSeq of RDDs
> 
> for ( i <- 0 to n ) {
>     val rddCachedSet1 = rdds.slice(i, i + 7).map(_.cache).reduceLeft { _.union(_) }
>     // do some computations on rddCachedSet1
> }

Unioning 7 of them shouldn't be a problem, though hundreds might be.

> 2. Will calling cache() on an rdd multiple times cause some undesired effects? If so,
I can probably check whether the RDD is already cached with rdd.getStorageLevel() ?

Calling cache() multiple times should work as long as you set the same storage level.

> 3. Is rdd.unpersist() planned for the next Spark release (0.8)?

Yes, it's already in the code.

> 4. Regarding LRU cache that you mentioned. Were you referring to garbage collector cleaning
up unused objects? If so, does that mean that even cached rdds will at some point get garbage-collected?

I was mentioning that we remove objects from the cache (i.e. unreference them) in an LRU manner,
when the memory for it becomes full. The GC will then collect them some time afterward. So
even if you cache arbitrarily many RDDs, only the least recently used ones will be in memory,
and you won't get OutOfMemoryErrors. But we are the ones controlling when we unreference them,
and the GC just picks up from there when it decides to clean stuff up.

Matei


> 
> Thanks,
> Grega
> 
> 
> On Wed, Aug 14, 2013 at 12:40 AM, Matei Zaharia <matei.zaharia@gmail.com> wrote:
> Hi Grega,
> 
> You'll need to create a new cached RDD for each batch, and then create the union of those
on every window. So for example, if you have rdd0, rdd1, and rdd2, you might first take the
union of 0 and 1, then of 1 and 2. This will let you use just the subset of RDDs you care
about instead of trying to subtract stuff.
> 
> In terms of uncaching, in the master branch, you can use RDD.unpersist() to uncache an
old RDD. In Spark 0.7, there's no user control over uncaching so you just have to wait for
it to fall out of the LRU cache.
> 
> Matei
> 
> On Aug 13, 2013, at 4:07 AM, Grega KeŇ°pret <grega@celtra.com> wrote:
> 
>> Hi all,
>> I would need some tips regarding how to go about doing a "rolling window" with Spark.
We would like to make something like this:
>> 
>> [----------- rolling window data -----------][ new ]
>> [ old ][----------- rolling window data -----------][ new ]
>> 
>> Effectively, the size of the data in rolling window will be much larger than additional
data in each iteration, so we want to cache it in memory. 
>> 
>> val rolling = sc.textFile(...).cache()
>> rolling.count()
>> // rolling window should be in cache
>> 
>> val new = sc.textFile(...)
>> val data = rolling.union(new).cache()
>> data.count() 
>> // rolling window + new should be in cache
>> 
>> We can add new data (and cache it) with unioning rolling window RDD with new RDD.
But how can we forget old data / remove it from cache? 
>> If it's of any help, we have the data segmented by small intervals so we know the
file names beforehand.
>> 
>> Thanks,
>> Grega
>> 
> 
> 


Mime
View raw message