spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: RDD and Partition
Date Tue, 28 Jan 2014 21:30:47 GMT
Doesn't avoid an 'if' on every partition, but does avoid it on every
element of every partition.


On Tue, Jan 28, 2014 at 1:29 PM, Mark Hamstra <mark@clearstorydata.com>wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
>
> // rdd26: RDD[String] split by first letter into 26 partitions
>
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> itr.map(_.toUpperCase) else itr }
>
>
>
>
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434884@gmail.com>wrote:
>
>> Thank you! That helps.
>>
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>>
>>
>>
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <mark@clearstorydata.com>wrote:
>>
>>> scala> import org.apache.spark.RangePartitioner
>>>
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at <console>:15
>>>
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> <console>:18
>>>
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>> s))).collect.foreach(println)
>>>
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>>
>>>
>>>
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> If you do something like:
>>>>
>>>> rdd.map{ str => (str.take(1), str) }
>>>>
>>>> you will have an RDD[(String, String)] where the key is the first
>>>> character of the string. Now when you perform an operation that uses
>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>> that you may not be able to enforce that each *machine* gets a
>>>> different letter, but in most cases that doesn't particularly matter as
>>>> long as you get "all values for a given key go to the same reducer"
>>>> behaviour.
>>>>
>>>> Perhaps if you expand on your use case we can provide more detailed
>>>> assistance.
>>>>
>>>>
>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <dt5434884@gmail.com>wrote:
>>>>
>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>> starting with A gets collected on machine1, B on machine2 and so on.
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message