spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhan Zhang <zzh...@hortonworks.com>
Subject Re: RDD.map does not allowed to preservesPartitioning?
Date Thu, 26 Mar 2015 21:54:44 GMT
Thanks Jonathan. You are right regarding rewrite the example.

I mean providing such option to developer so that it is controllable. The example may seems
silly, and I don’t know the use cases.

But for example, if I also want to operate both the key and value part to generate some new
value with keeping key part untouched. Then mapValues may not be able to  do this.

Changing the code to allow this is trivial, but I don’t know whether there is some special
reason behind this.

Thanks.

Zhan Zhang



On Mar 26, 2015, at 2:49 PM, Jonathan Coveney <jcoveney@gmail.com<mailto:jcoveney@gmail.com>>
wrote:

I believe if you do the following:

sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4)).map((_,1)).reduceByKey(_+_).mapValues(_+1).reduceByKey(_+_).toDebugString

(8) MapPartitionsRDD[34] at reduceByKey at <console>:23 []
 |  MapPartitionsRDD[33] at mapValues at <console>:23 []
 |  ShuffledRDD[32] at reduceByKey at <console>:23 []
 +-(8) MapPartitionsRDD[31] at map at <console>:23 []
    |  ParallelCollectionRDD[30] at parallelize at <console>:23 []

The difference is that spark has no way to know that your map closure doesn't change the key.
if you only use mapValues, it does. Pretty cool that they optimized that :)

2015-03-26 17:44 GMT-04:00 Zhan Zhang <zzhang@hortonworks.com<mailto:zzhang@hortonworks.com>>:
Hi Folks,

Does anybody know what is the reason not allowing preserverPartitioning in RDD.map? Do I miss
something here?

Following example involves two shuffles. I think if preservePartitioning is allowed, we can
avoid the second one, right?

 val r1 = sc.parallelize(List(1,2,3,4,5,5,6,6,7,8,9,10,2,4))
 val r2 = r1.map((_, 1))
 val r3 = r2.reduceByKey(_+_)
 val r4 = r3.map(x=>(x._1, x._2 + 1))
 val r5 = r4.reduceByKey(_+_)
 r5.collect.foreach(println)

scala> r5.toDebugString
res2: String =
(8) ShuffledRDD[4] at reduceByKey at <console>:29 []
 +-(8) MapPartitionsRDD[3] at map at <console>:27 []
    |  ShuffledRDD[2] at reduceByKey at <console>:25 []
    +-(8) MapPartitionsRDD[1] at map at <console>:23 []
       |  ParallelCollectionRDD[0] at parallelize at <console>:21 []

Thanks.

Zhan Zhang

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org<mailto:user-unsubscribe@spark.apache.org>
For additional commands, e-mail: user-help@spark.apache.org<mailto:user-help@spark.apache.org>




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message