spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: RDD and Partition
Date Tue, 28 Jan 2014 22:08:51 GMT
SparkContext#runJob is the basis of an RDD action, so the result of using
runJob to call toUpperCase on the A-to-M partitions will be the uppercased
strings materialized in the driver process, not a transformation of the
original RDD.


On Tue, Jan 28, 2014 at 1:37 PM, Christopher Nguyen <ctn@adatao.com> wrote:

> David,
>
> map() would iterate row by row, forcing an if on each row.
>
> mapPartitions*() allows you to have a conditional on the whole partition
> first, as Mark suggests. That should usually be sufficient.
>
> SparkContext.runJob() allows you to specify which partitions to run on, if
> you're sure it's necessary and sufficient, and not over optimization.
>
> Sent while mobile. Pls excuse typos etc.
> On Jan 28, 2014 1:30 PM, "Mark Hamstra" <mark@clearstorydata.com> wrote:
>
>> If I'm understanding you correctly, there's lots of ways you could do
>> that.  Here's one, continuing from the previous example:
>>
>> // rdd26: RDD[String] split by first letter into 26 partitions
>>
>> val range = (Char.char2int('A') - 65 to Char.char2int('M') - 65)
>>
>> rdd26..mapPartitionsWithIndex { (idx, itr) => if (range.contains(idx))
>> itr.map(_.toUpperCase) else itr }
>>
>>
>>
>>
>> On Tue, Jan 28, 2014 at 12:52 PM, David Thomas <dt5434884@gmail.com>wrote:
>>
>>> Thank you! That helps.
>>>
>>> A follow up question on this. How can I apply a function only on a
>>> subset of this RDD. Lets say, I need all strings starting in the range 'A'
>>> - 'M' be applied toUpperCase and not touch the remaining. Is that possible
>>> without running an 'if' condition on all the partitions in the cluster?
>>>
>>>
>>>
>>> On Tue, Jan 28, 2014 at 1:20 PM, Mark Hamstra <mark@clearstorydata.com>wrote:
>>>
>>>> scala> import org.apache.spark.RangePartitioner
>>>>
>>>> scala> val rdd = sc.parallelize(List("apple", "Ball", "cat", "dog",
>>>> "Elephant", "fox", "gas", "horse", "index", "jet", "kitsch", "long",
>>>> "moon", "Neptune", "ooze", "Pen", "quiet", "rose", "sun", "talk",
>>>> "umbrella", "voice", "Walrus", "xeon", "Yam", "zebra"))
>>>>
>>>> scala> rdd.keyBy(s => s(0).toUpper)
>>>> res0: org.apache.spark.rdd.RDD[(Char, String)] = MappedRDD[1] at keyBy
>>>> at <console>:15
>>>>
>>>> scala> res0.partitionBy(new RangePartitioner[Char, String](26,
>>>> res0)).values
>>>> res2: org.apache.spark.rdd.RDD[String] = MappedRDD[5] at values at
>>>> <console>:18
>>>>
>>>> scala> res2.mapPartitionsWithIndex((idx, itr) => itr.map(s => (idx,
>>>> s))).collect.foreach(println)
>>>>
>>>> (0,apple)
>>>> (1,Ball)
>>>> (2,cat)
>>>> (3,dog)
>>>> (4,Elephant)
>>>> (5,fox)
>>>> (6,gas)
>>>> (7,horse)
>>>> (8,index)
>>>> (9,jet)
>>>> (10,kitsch)
>>>> (11,long)
>>>> (12,moon)
>>>> (13,Neptune)
>>>> (14,ooze)
>>>> (15,Pen)
>>>> (16,quiet)
>>>> (17,rose)
>>>> (18,sun)
>>>> (19,talk)
>>>> (20,umbrella)
>>>> (21,voice)
>>>> (22,Walrus)
>>>> (23,xeon)
>>>> (24,Yam)
>>>> (25,zebra)
>>>>
>>>>
>>>>
>>>> On Tue, Jan 28, 2014 at 11:48 AM, Nick Pentreath <
>>>> nick.pentreath@gmail.com> wrote:
>>>>
>>>>> If you do something like:
>>>>>
>>>>> rdd.map{ str => (str.take(1), str) }
>>>>>
>>>>> you will have an RDD[(String, String)] where the key is the first
>>>>> character of the string. Now when you perform an operation that uses
>>>>> partitioning (e.g. reduceByKey) you will end up with the 1st reduce task
>>>>> receiving all the strings with A, the 2nd all the strings with B etc.
Note
>>>>> that you may not be able to enforce that each *machine* gets a
>>>>> different letter, but in most cases that doesn't particularly matter
as
>>>>> long as you get "all values for a given key go to the same reducer"
>>>>> behaviour.
>>>>>
>>>>> Perhaps if you expand on your use case we can provide more detailed
>>>>> assistance.
>>>>>
>>>>>
>>>>> On Tue, Jan 28, 2014 at 9:35 PM, David Thomas <dt5434884@gmail.com>wrote:
>>>>>
>>>>>> Lets say I have an RDD of Strings and there are 26 machines in the
>>>>>> cluster. How can I repartition the RDD in such a way that all strings
>>>>>> starting with A gets collected on machine1, B on machine2 and so
on.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message