spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christopher Nguyen <...@adatao.com>
Subject Re: RDD and Partition
Date Tue, 28 Jan 2014 21:37:42 GMT
David,

map() would iterate row by row, forcing an if on each row.

mapPartitions*() allows you to have a conditional on the whole partition
first, as Mark suggests. That should usually be sufficient.

SparkContext.runJob() allows you to specify which partitions to run on, if
you're sure it's necessary and sufficient, and not over optimization.

Sent while mobile. Pls excuse typos etc.
On Jan 28, 2014 1:30 PM, "Mark Hamstra" <mark@clearstorydata.com> wrote:

> If I'm understanding you correctly, there's lots of ways you could do
> that.  Here's one, continuing from the previous example:
>
> // rdd26: RDD[String] split by first letter into 26 partitions
>
> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>
> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
> itr.map(_.toUpperCase) else itr }
>
>
>
>
> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434884@gmail.com>wrote:
>
>> Thank you! That helps.
>>
>> A follow up question on this. How can I apply a function only on a subset
>> of this RDD. Lets say, I need all strings starting in the range 'A' - 'M'
>> be applied toUpperCase and not touch the remaining. Is that possible
>> without running an 'if' condition on all the partitions in the cluster?
>>
>>
>>
>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <mark@clearstorydata.com>wrote:
>>
>>> scala> import org.apache.spark.RangePartitioner
>>>
>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>
>>> scala> rdd.keyBy(s => s(0).toUpper)
>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>> at <console>:15
>>>
>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>> res0)).values
>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>> <console>:18
>>>
>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>> s))).collect.foreach(println)
>>>
>>> (0,apple)
>>> (1,Ball)
>>> (2,cat)
>>> (3,dog)
>>> (4,Elephant)
>>> (5,fox)
>>> (6,gas)
>>> (7,horse)
>>> (8,index)
>>> (9,jet)
>>> (10,kitsch)
>>> (11,long)
>>> (12,moon)
>>> (13,Neptune)
>>> (14,ooze)
>>> (15,Pen)
>>> (16,quiet)
>>> (17,rose)
>>> (18,sun)
>>> (19,talk)
>>> (20,umbrella)
>>> (21,voice)
>>> (22,Walrus)
>>> (23,xeon)
>>> (24,Yam)
>>> (25,zebra)
>>>
>>>
>>>
>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>> nick.pentreath@gmail.com> wrote:
>>>
>>>> If you do something like:
>>>>
>>>> rdd.map{ str => (str.take(1), str) }
>>>>
>>>> you will have an RDD[(String, String)] where the key is the first
>>>> character of the string. Now when you perform an operation that uses
>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>> receiving all the strings with A, the 2nd all the strings with B etc. Note
>>>> that you may not be able to enforce that each *machine* gets a
>>>> different letter, but in most cases that doesn't particularly matter as
>>>> long as you get "all values for a given key go to the same reducer"
>>>> behaviour.
>>>>
>>>> Perhaps if you expand on your use case we can provide more detailed
>>>> assistance.
>>>>
>>>>
>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <dt5434884@gmail.com>wrote:
>>>>
>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>> starting with A gets collected on machine1, B on machine2 and so on.
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message