spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: RDD cache question
Date Tue, 03 Dec 2013 06:19:28 GMT
LRU.  See org.apache.spark.storage.MemoryStore#ensureFreeSpace -- just
iterates through entries in order (i.e. the order in which blocks were
added to the cache.)



On Mon, Dec 2, 2013 at 9:08 PM, Yadid Ayzenberg <yadid@media.mit.edu> wrote:

>  Thanks Mark, that makes perfect sense.
> I guess I still don't have a full picture in my head when in comes to the
> caching:
> How is the RDD cache managed (assuming not enough memory for all the
> cached RDDs): is it LRU or LFU, or something else ?
>
>
> Thanks,
>
> Yadid
>
>
>
> On 11/30/13 10:56 PM, Mark Hamstra wrote:
>
> If you're not performing any action until the very end, then there is no
> need to cache at all.
>
>  Observe (and don't make the mistake of thinking that my code represents
> best practice and not just an illustration):
>
>   scala> val rdd0 = sc.parallelize(1 to 4, 2)
>  rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> var rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
>       |   println("*" * (idx + 1))
>       |   itr.map(_ + 1)
>       | }
>  starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]
>
>   scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
>  res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
> mapPartitionsWithIndex at <console>:19
>
>   scala> rdd.collect
>  res1: Array[Int] = Array(1, 2, 3, 4)
>
>   scala> rdd = res0
>  rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
> mapPartitionsWithIndex at <console>:19
>
>   scala> rdd.collect
>  *
>  **
>  res2: Array[Int] = Array(2, 3, 4, 5)
>
>   scala> rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>  rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
> mapPartitionsWithIndex at <console>:18
>
>   scala> rdd.collect
>  *
>  **
>  res3: Array[Int] = Array(2, 3, 4, 5)
>
>   scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>  rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
> mapPartitionsWithIndex at <console>:18
>
>   scala> rdd.collect
>  *
>  *
>  **
>  **
>  res4: Array[Int] = Array(3, 4, 5, 6)
>
>   scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>  rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
> mapPartitionsWithIndex at <console>:18
>
>   scala> rdd.collect
>  *
>  *
>  *
>  **
>  **
>  **
>  res5: Array[Int] = Array(4, 5, 6, 7)
>
>   scala> rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> for (_ <- 1 to 3) { rdd =
> rdd.mapPartitionsWithIndex(starsPlusOne, true) }
>
>   scala> rdd.collect
>  *
>  *
>  *
>  **
>  **
>  **
>  res7: Array[Int] = Array(4, 5, 6, 7)
>
>   scala> rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> for (_ <- 1 to 3) {
>       |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>       |   rdd.cache()
>       | }
>
>   scala> rdd.collect
>  *
>  *
>  *
>  **
>  **
>  **
>  res9: Array[Int] = Array(4, 5, 6, 7)
>
>   scala> rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> for (_ <- 1 to 3) {
>       |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>       |   rdd.cache()
>        |   println(rdd.sum)
>       | }
>  *
>  **
>  14.0
>  *
>  **
>  18.0
>   *
>  **
>  22.0
>
>   scala> rdd.collect
>  res11: Array[Int] = Array(4, 5, 6, 7)
>
>   scala> rdd = rdd0
>  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> var temp = rdd0
>  temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
>   scala> for (_ <- 1 to 3) {
>       |   temp = rdd
>       |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>       |   rdd.cache()
>       |   temp.unpersist()
>       |   println(rdd.sum)
>       | }
>  *
>  **
>  14.0
>  *
>  *
>  **
>  **
>  18.0
>  *
>  *
>  *
>  **
>  **
>  **
>  22.0
>
>   scala> rdd.collect
>
> res13: Array[Int] = Array(4, 5, 6, 7)
>
> scala> rdd = rdd0
> rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
> parallelize at <console>:12
>
> scala> for (_ <- 1 to 3) {
>      |   temp = rdd
>      |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>      |   rdd.cache()
>      |   println(rdd.sum)
>      |   temp.unpersist()
>      | }
> *
> **
> 14.0
> *
> **
> 18.0
> *
> **
> 22.0
>
> scala> rdd.collect
> res14: Array[Int] = Array(4, 5, 6, 7)
>
>
>
> Take-aways:
>
>    1. spark-shell is your friend
>    2. assigning an RDD to a var doesn't modify the underlying collection;
>    the var is more just a pointer into the specified lineage of
>    transformations for the underlying collection
>    3. cache() doesn't really do anything useful unless more than one
>    action is performed on the underlying collection
>    4. performing gratuitous actions within a loop just so that calling
>    cache() within the loop will have some immediate effect will usually not
>    change the total amount of work done outside the gratuitous job vs. just
>    doing one action after the looped transformations
>    5. trying to unpersist() prior cached iterations can work, but it is
>    sensitive to where it occurs relative to actions
>
>
>
> On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <yadid@media.mit.edu>wrote:
>
>>  step 4. would be count(), or collect(). The map() (in step 2.)  would
>> be performing calculations and writing information to a DB.
>>
>> Is this the information that was missing ?
>>
>> Thanks,
>>
>> Yadid
>>
>>
>>
>>
>>
>>
>> On 11/30/13 9:24 PM, Mark Hamstra wrote:
>>
>> Your question doesn't really make any sense without specifying where any
>> RDD actions take place (i.e. where Spark jobs are actually run.)  Without
>> any actions, all you've outlined so far are different ways to specify the
>> chain of transformations that should be evaluated when an action is
>> eventually called and a job runs.  In a real sense your code hasn't
>> actually done anything yet.
>>
>>
>> On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg <yadid@media.mit.edu>wrote:
>>
>>>
>>>
>>>
>>> Hi All,
>>>
>>> Im trying to implement the following and would like to know in which
>>> places I should be calling RDD.cache():
>>>
>>> Suppose I have a group of RDDs : RDD1 to RDDn as input.
>>>
>>> 1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)
>>>
>>> 2. for i = 0 to x:    RDD_total = RDD_total.map (some map function());
>>>
>>> 3. return RDD_total.
>>>
>>> I that I should cache RDD total in order to optimize the iterations.
>>> Should I just be calling RDD_total.cache() at the end of each iteration ?
>>> or should I be preforming something more elaborate:
>>>
>>>
>>> RDD_temp = RDD_total.map (some map function());
>>> RDD_total.unpersist();
>>> RDD_total = RDD_temp.cache();
>>>
>>>
>>>
>>> Thanks,
>>> Yadid
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message