spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Grega Kešpret <gr...@celtra.com>
Subject Re: Rolling window cache
Date Mon, 26 Aug 2013 12:02:02 GMT
Hi Matei,

thanks for the response, I haven't noticed it until now.
1. Is there some performance penalty if we union many small RDDs?

I will need to union 7 RDDs continuously in a loop, where 6 RDDs will be
common from 1 iteration to the next. Something along the lines:

val rdds = ... // IndexedSeq of RDDs

for ( i <- 0 to n ) {
    val rddCachedSet1 = rdds.slice(i, i + 7).map(_.cache).reduceLeft {
_.union(_) }
    // do some computations on rddCachedSet1
}

2. Will calling cache() on an rdd multiple times cause some undesired
effects? If so, I can probably check whether the RDD is already cached with
rdd.getStorageLevel() ?

3. Is rdd.unpersist() planned for the next Spark release (0.8)?

4. Regarding LRU cache that you mentioned. Were you referring to garbage
collector cleaning up unused objects? If so, does that mean that even
cached rdds will at some point get garbage-collected?

Thanks,
Grega


On Wed, Aug 14, 2013 at 12:40 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> Hi Grega,
>
> You'll need to create a new cached RDD for each batch, and then create the
> union of those on every window. So for example, if you have rdd0, rdd1, and
> rdd2, you might first take the union of 0 and 1, then of 1 and 2. This will
> let you use just the subset of RDDs you care about instead of trying to
> subtract stuff.
>
> In terms of uncaching, in the master branch, you can use RDD.unpersist()
> to uncache an old RDD. In Spark 0.7, there's no user control over uncaching
> so you just have to wait for it to fall out of the LRU cache.
>
> Matei
>
> On Aug 13, 2013, at 4:07 AM, Grega Kešpret <grega@celtra.com> wrote:
>
> Hi all,
> I would need some tips regarding how to go about doing a "rolling window"
> with Spark. We would like to make something like this:
>
> [----------- rolling window data -----------][ new ]
> [ old ][----------- rolling window data -----------][ new ]
>
> Effectively, the size of the data in rolling window will be much larger
> than additional data in each iteration, so we want to cache it in memory.
>
> val rolling = sc.textFile(...).cache()
> rolling.count()
> // rolling window should be in cache
>
> val new = sc.textFile(...)
> val data = rolling.union(new).cache()
> data.count()
> // rolling window + new should be in cache
>
> We can add new data (and cache it) with unioning rolling window RDD with
> new RDD. But how can we forget old data / remove it from cache?
> If it's of any help, we have the data segmented by small intervals so we
> know the file names beforehand.
>
> Thanks,
> Grega
>
>
>

Mime
View raw message