Thanks, that was what I was missing!

arun


arun
______________
Arun Swami
+1 408-338-0906



On Fri, May 2, 2014 at 4:28 AM, Mayur Rustagi <mayur.rustagi@gmail.com> wrote:
You need to first partition the data by the key
Use mappartition instead of map. 



On Fri, May 2, 2014 at 5:33 AM, Arun Swami <arun@caspida.com> wrote:
Hi,

I am a newbie to Spark. I looked for documentation or examples to answer my question but came up empty handed.

I don't know whether I am using the right terminology but here goes.

I have a file of records. Initially, I had the following Spark program (I am omitting all the surrounding code and focusing only on the Spark related code):

...
val recordsRDD = sc.textFile(pathSpec, 2).cache
val countsRDD: RDD[(String, Int)] = recordsRDD.flatMap(x => getCombinations(x))
  .map(e => (e, 1))
  .reduceByKey(_ + _)
...

Here getCombinations() is a function I have written that takes a record and returns List[String].

This program works as expected.

Now, I want to do the following. I want to partition the records in recordsRDD by some key extracted from each record. I do this as follows:

val keyValueRecordsRDD: RDD[(String, String)] =
  recodsRDD.flatMap(getKeyValueRecord(_))

Here getKeyValueRecord() is a function I have written that takes a record and returns a Tuple2 of a key and the original record.

Now I want to do the same operations as before (getCombinations(), and count occurrences) BUT on each partition as defined by the key. Essentially, I want to apply the operations individually in each partition. In a separate step, I want to recover the
global counts across all partitions while keeping the partition based counts.

How can I do this in Spark? 

Thanks!

arun
______________
Arun Swami