spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grega Kešpret <gr...@celtra.com>
Subject Re: Rolling window cache
Date Wed, 28 Aug 2013 09:07:41 GMT
I see. Thanks for clearing things up. Looking forward to the 0.8 release.

Grega


On Wed, Aug 28, 2013 at 12:58 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> Hi Grega,
>
> thanks for the response, I haven't noticed it until now.
> 1. Is there some performance penalty if we union many small RDDs?
>
> I will need to union 7 RDDs continuously in a loop, where 6 RDDs will be
> common from 1 iteration to the next. Something along the lines:
>
> val rdds = ... // IndexedSeq of RDDs
>
> for ( i <- 0 to n ) {
>     val rddCachedSet1 = rdds.slice(i, i + 7).map(_.cache).reduceLeft {
> _.union(_) }
>     // do some computations on rddCachedSet1
> }
>
>
> Unioning 7 of them shouldn't be a problem, though hundreds might be.
>
> 2. Will calling cache() on an rdd multiple times cause some undesired
> effects? If so, I can probably check whether the RDD is already cached with
> rdd.getStorageLevel() ?
>
>
> Calling cache() multiple times should work as long as you set the same
> storage level.
>
> 3. Is rdd.unpersist() planned for the next Spark release (0.8)?
>
>
> Yes, it's already in the code.
>
> 4. Regarding LRU cache that you mentioned. Were you referring to garbage
> collector cleaning up unused objects? If so, does that mean that even
> cached rdds will at some point get garbage-collected?
>
>
> I was mentioning that we remove objects from the cache (i.e. unreference
> them) in an LRU manner, when the memory for it becomes full. The GC will
> then collect them some time afterward. So even if you cache arbitrarily
> many RDDs, only the least recently used ones will be in memory, and you
> won't get OutOfMemoryErrors. But we are the ones controlling when we
> unreference them, and the GC just picks up from there when it decides to
> clean stuff up.
>
> Matei
>
>
>
> Thanks,
> Grega
>
>
> On Wed, Aug 14, 2013 at 12:40 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>
>> Hi Grega,
>>
>> You'll need to create a new cached RDD for each batch, and then create
>> the union of those on every window. So for example, if you have rdd0, rdd1,
>> and rdd2, you might first take the union of 0 and 1, then of 1 and 2. This
>> will let you use just the subset of RDDs you care about instead of trying
>> to subtract stuff.
>>
>> In terms of uncaching, in the master branch, you can use RDD.unpersist()
>> to uncache an old RDD. In Spark 0.7, there's no user control over uncaching
>> so you just have to wait for it to fall out of the LRU cache.
>>
>> Matei
>>
>> On Aug 13, 2013, at 4:07 AM, Grega Kešpret <grega@celtra.com> wrote:
>>
>> Hi all,
>> I would need some tips regarding how to go about doing a "rolling window"
>> with Spark. We would like to make something like this:
>>
>> [----------- rolling window data -----------][ new ]
>> [ old ][----------- rolling window data -----------][ new ]
>>
>> Effectively, the size of the data in rolling window will be much larger
>> than additional data in each iteration, so we want to cache it in memory.
>>
>> val rolling = sc.textFile(...).cache()
>> rolling.count()
>> // rolling window should be in cache
>>
>> val new = sc.textFile(...)
>> val data = rolling.union(new).cache()
>> data.count()
>> // rolling window + new should be in cache
>>
>> We can add new data (and cache it) with unioning rolling window RDD with
>> new RDD. But how can we forget old data / remove it from cache?
>> If it's of any help, we have the data segmented by small intervals so we
>> know the file names beforehand.
>>
>> Thanks,
>> Grega
>>
>>
>>
>
>

Mime
View raw message