spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: RDD cache question
Date Sun, 01 Dec 2013 03:56:22 GMT
If you're not performing any action until the very end, then there is no
need to cache at all.

Observe (and don't make the mistake of thinking that my code represents
best practice and not just an illustration):

scala> val rdd0 = sc.parallelize(1 to 4, 2)
rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> var rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
     |   println("*" * (idx + 1))
     |   itr.map(_ + 1)
     | }
starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]

scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19

scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4)

scala> rdd = res0
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19

scala> rdd.collect
*
**
res2: Array[Int] = Array(2, 3, 4, 5)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
mapPartitionsWithIndex at <console>:18

scala> rdd.collect
*
**
res3: Array[Int] = Array(2, 3, 4, 5)

scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
mapPartitionsWithIndex at <console>:18

scala> rdd.collect
*
*
**
**
res4: Array[Int] = Array(3, 4, 5, 6)

scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
mapPartitionsWithIndex at <console>:18

scala> rdd.collect
*
*
*
**
**
**
res5: Array[Int] = Array(4, 5, 6, 7)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> for (_ <- 1 to 3) { rdd = rdd.mapPartitionsWithIndex(starsPlusOne,
true) }

scala> rdd.collect
*
*
*
**
**
**
res7: Array[Int] = Array(4, 5, 6, 7)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> for (_ <- 1 to 3) {
     |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
     |   rdd.cache()
     | }

scala> rdd.collect
*
*
*
**
**
**
res9: Array[Int] = Array(4, 5, 6, 7)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> for (_ <- 1 to 3) {
     |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
     |   rdd.cache()
     |   println(rdd.sum)
     | }
*
**
14.0
*
**
18.0
*
**
22.0

scala> rdd.collect
res11: Array[Int] = Array(4, 5, 6, 7)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> var temp = rdd0
temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> for (_ <- 1 to 3) {
     |   temp = rdd
     |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
     |   rdd.cache()
     |   temp.unpersist()
     |   println(rdd.sum)
     | }
*
**
14.0
*
*
**
**
18.0
*
*
*
**
**
**
22.0

scala> rdd.collect

res13: Array[Int] = Array(4, 5, 6, 7)

scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12

scala> for (_ <- 1 to 3) {
     |   temp = rdd
     |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
     |   rdd.cache()
     |   println(rdd.sum)
     |   temp.unpersist()
     | }
*
**
14.0
*
**
18.0
*
**
22.0

scala> rdd.collect
res14: Array[Int] = Array(4, 5, 6, 7)



Take-aways:

   1. spark-shell is your friend
   2. assigning an RDD to a var doesn't modify the underlying collection;
   the var is more just a pointer into the specified lineage of
   transformations for the underlying collection
   3. cache() doesn't really do anything useful unless more than one action
   is performed on the underlying collection
   4. performing gratuitous actions within a loop just so that calling
   cache() within the loop will have some immediate effect will usually not
   change the total amount of work done outside the gratuitous job vs. just
   doing one action after the looped transformations
   5. trying to unpersist() prior cached iterations can work, but it is
   sensitive to where it occurs relative to actions



On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <yadid@media.mit.edu>wrote:

>  step 4. would be count(), or collect(). The map() (in step 2.)  would be
> performing calculations and writing information to a DB.
>
> Is this the information that was missing ?
>
> Thanks,
>
> Yadid
>
>
>
>
>
>
> On 11/30/13 9:24 PM, Mark Hamstra wrote:
>
> Your question doesn't really make any sense without specifying where any
> RDD actions take place (i.e. where Spark jobs are actually run.)  Without
> any actions, all you've outlined so far are different ways to specify the
> chain of transformations that should be evaluated when an action is
> eventually called and a job runs.  In a real sense your code hasn't
> actually done anything yet.
>
>
> On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg <yadid@media.mit.edu>wrote:
>
>>
>>
>>
>> Hi All,
>>
>> Im trying to implement the following and would like to know in which
>> places I should be calling RDD.cache():
>>
>> Suppose I have a group of RDDs : RDD1 to RDDn as input.
>>
>> 1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)
>>
>> 2. for i = 0 to x:    RDD_total = RDD_total.map (some map function());
>>
>> 3. return RDD_total.
>>
>> I that I should cache RDD total in order to optimize the iterations.
>> Should I just be calling RDD_total.cache() at the end of each iteration ?
>> or should I be preforming something more elaborate:
>>
>>
>> RDD_temp = RDD_total.map (some map function());
>> RDD_total.unpersist();
>> RDD_total = RDD_temp.cache();
>>
>>
>>
>> Thanks,
>> Yadid
>>
>>
>>
>>
>>
>>
>>
>
>

Mime
View raw message