SparkContext#runJob is the basis of an RDD action, so the result of using runJob to call toUpperCase on the A-to-M partitions will be the uppercased strings materialized in the driver process, not a transformation of the original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <ctn@adatao.com> wrote:

David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.

On Jan 28, 2014 1:30 PM, "Mark Hamstra" <mark@clearstorydata.com> wrote:
If I'm understanding you correctly, there's lots of ways you could do that.  Here's one, continuing from the previous example:

// rdd26: RDD[String] split by first letter into 26 partitions

val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)

rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx)) itr.map(_.toUpperCase) else itr }




On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434884@gmail.com> wrote:
Thank you! That helps.

A follow up question on this. How can I apply a function only on a subset of this RDD. Lets say, I need all strings starting in the range 'A' - 'M' be applied toUpperCase and not touch the remaining. Is that possible without running an 'if' condition on all the partitions in the cluster?



On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <mark@clearstorydata.com> wrote:
scala> import org.apache.spark.RangePartitioner

scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog", "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long", "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk", "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))

scala> rdd.keyBy(s => s(0).toUpper)
res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy at <console>:15

scala> res0.partitionBy(new RangePartitioner[Char, String](26, res0)).values
res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at <console>:18

scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx, s))).collect.foreach(println)

(0,apple)
(1,Ball)
(2,cat)
(3,dog)
(4,Elephant)
(5,fox)
(6,gas)
(7,horse)
(8,index)
(9,jet)
(10,kitsch)
(11,long)
(12,moon)
(13,Neptune)
(14,ooze)
(15,Pen)
(16,quiet)
(17,rose)
(18,sun)
(19,talk)
(20,umbrella)
(21,voice)
(22,Walrus)
(23,xeon)
(24,Yam)
(25,zebra)



On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <nick.pentreath@gmail.com> wrote:
If you do something like:

rdd.map{ str => (str.take(1), str) }

you will have an RDD[(String, String)] where the key is the first character of the string. Now when you perform an operation that uses partitioning (e.g. reduceByKey) you will end up with the 1st reduce task receiving all the strings with A, the 2nd all the strings with B etc. Note that you may not be able to enforce that each machine gets a different letter, but in most cases that doesn't particularly matter as long as you get "all values for a given key go to the same reducer" behaviour.

Perhaps if you expand on your use case we can provide more detailed assistance.


On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <dt5434884@gmail.com> wrote:
Lets say I have an RDD of Strings and there are 26 machines in the cluster. How can I repartition the RDD in such a way that all strings starting with A gets collected on machine1, B on machine2 and so on.