spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yadid Ayzenberg <ya...@media.mit.edu>
Subject Re: RDD cache question
Date Tue, 03 Dec 2013 05:08:03 GMT
Thanks Mark, that makes perfect sense.
I guess I still don't have a full picture in my head when in comes to 
the caching:
How is the RDD cache managed (assuming not enough memory for all the 
cached RDDs): is it LRU or LFU, or something else ?


Thanks,

Yadid


On 11/30/13 10:56 PM, Mark Hamstra wrote:
> If you're not performing any action until the very end, then there is 
> no need to cache at all.
>
> Observe (and don't make the mistake of thinking that my code 
> represents best practice and not just an illustration):
>
>     scala> val rdd0 = sc.parallelize(1 to 4, 2)
>     rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> var rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
>          |   println("*" * (idx + 1))
>          |   itr.map(_ + 1)
>          | }
>     starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]
>
>     scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
>     res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
>     mapPartitionsWithIndex at <console>:19
>
>     scala> rdd.collect
>     res1: Array[Int] = Array(1, 2, 3, 4)
>
>     scala> rdd = res0
>     rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
>     mapPartitionsWithIndex at <console>:19
>
>     scala> rdd.collect
>     *
>     **
>     res2: Array[Int] = Array(2, 3, 4, 5)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>     rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
>     mapPartitionsWithIndex at <console>:18
>
>     scala> rdd.collect
>     *
>     **
>     res3: Array[Int] = Array(2, 3, 4, 5)
>
>     scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>     rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
>     mapPartitionsWithIndex at <console>:18
>
>     scala> rdd.collect
>     *
>     *
>     **
>     **
>     res4: Array[Int] = Array(3, 4, 5, 6)
>
>     scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>     rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
>     mapPartitionsWithIndex at <console>:18
>
>     scala> rdd.collect
>     *
>     *
>     *
>     **
>     **
>     **
>     res5: Array[Int] = Array(4, 5, 6, 7)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> for (_ <- 1 to 3) { rdd =
>     rdd.mapPartitionsWithIndex(starsPlusOne, true) }
>
>     scala> rdd.collect
>     *
>     *
>     *
>     **
>     **
>     **
>     res7: Array[Int] = Array(4, 5, 6, 7)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> for (_ <- 1 to 3) {
>          |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>          |   rdd.cache()
>          | }
>
>     scala> rdd.collect
>     *
>     *
>     *
>     **
>     **
>     **
>     res9: Array[Int] = Array(4, 5, 6, 7)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> for (_ <- 1 to 3) {
>          |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>          |   rdd.cache()
>          |   println(rdd.sum)
>          | }
>     *
>     **
>     14.0
>     *
>     **
>     18.0
>     *
>     **
>     22.0
>
>     scala> rdd.collect
>     res11: Array[Int] = Array(4, 5, 6, 7)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> var temp = rdd0
>     temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> for (_ <- 1 to 3) {
>          |   temp = rdd
>          |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>          |   rdd.cache()
>          |   temp.unpersist()
>          |   println(rdd.sum)
>          | }
>     *
>     **
>     14.0
>     *
>     *
>     **
>     **
>     18.0
>     *
>     *
>     *
>     **
>     **
>     **
>     22.0
>
>     scala> rdd.collect
>
>     res13: Array[Int] = Array(4, 5, 6, 7)
>
>     scala> rdd = rdd0
>     rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
>     parallelize at <console>:12
>
>     scala> for (_ <- 1 to 3) {
>          |   temp = rdd
>          |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
>          |   rdd.cache()
>          |   println(rdd.sum)
>          |   temp.unpersist()
>          | }
>     *
>     **
>     14.0
>     *
>     **
>     18.0
>     *
>     **
>     22.0
>
>     scala> rdd.collect
>     res14: Array[Int] = Array(4, 5, 6, 7)
>
>
>
> Take-aways:
>
>  1. spark-shell is your friend
>  2. assigning an RDD to a var doesn't modify the underlying
>     collection; the var is more just a pointer into the specified
>     lineage of transformations for the underlying collection
>  3. cache() doesn't really do anything useful unless more than one
>     action is performed on the underlying collection
>  4. performing gratuitous actions within a loop just so that calling
>     cache() within the loop will have some immediate effect will
>     usually not change the total amount of work done outside the
>     gratuitous job vs. just doing one action after the looped
>     transformations
>  5. trying to unpersist() prior cached iterations can work, but it is
>     sensitive to where it occurs relative to actions
>
>
>
> On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <yadid@media.mit.edu 
> <mailto:yadid@media.mit.edu>> wrote:
>
>     step 4. would be count(), or collect(). The map() (in step 2.) 
>     would be performing calculations and writing information to a DB.
>
>     Is this the information that was missing ?
>
>     Thanks,
>
>     Yadid
>
>
>
>
>
>
>     On 11/30/13 9:24 PM, Mark Hamstra wrote:
>>     Your question doesn't really make any sense without specifying
>>     where any RDD actions take place (i.e. where Spark jobs are
>>     actually run.)  Without any actions, all you've outlined so far
>>     are different ways to specify the chain of transformations that
>>     should be evaluated when an action is eventually called and a job
>>     runs.  In a real sense your code hasn't actually done anything yet.
>>
>>
>>     On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg
>>     <yadid@media.mit.edu <mailto:yadid@media.mit.edu>> wrote:
>>
>>
>>
>>
>>         Hi All,
>>
>>         Im trying to implement the following and would like to know
>>         in which places I should be calling RDD.cache():
>>
>>         Suppose I have a group of RDDs : RDD1 to RDDn as input.
>>
>>         1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)
>>
>>         2. for i = 0 to x:    RDD_total = RDD_total.map (some map
>>         function());
>>
>>         3. return RDD_total.
>>
>>         I that I should cache RDD total in order to optimize the
>>         iterations. Should I just be calling RDD_total.cache() at the
>>         end of each iteration ? or should I be preforming something
>>         more elaborate:
>>
>>
>>         RDD_temp = RDD_total.map (some map function());
>>         RDD_total.unpersist();
>>         RDD_total = RDD_temp.cache();
>>
>>
>>
>>         Thanks,
>>         Yadid
>>
>>
>>
>>
>>
>>
>>
>
>


Mime
View raw message