OK this is how I solved it. Not elegant at all but works and I need to move ahead at this time.
Converting to pair RDD is now not required.
                reacRdd.map(line => line.split(',')).map(fields => {
                  if (fields.length >= 10 && !fields(0).contains("VAERS_ID")) {
((fields(0)+","+fields(1)+"\t"+fields(0)+","+fields(3)+"\t"+fields(0)+","+fields(5)+"\t"+fields(0)+","+fields(7)+"\t"+fields(0)+","+fields(9)))
}
else {
("")
}
}).flatMap(str => str.split('\t')).filter(line => line.toString.length() > 0).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)


From: Sanjay Subramanian <sanjaysubramanian@yahoo.com.INVALID>
To: Hitesh Khamesra <hiteshkh25@gmail.com>
Cc: Kapil Malik <kmalik@adobe.com>; Sean Owen <sowen@cloudera.com>; "user@spark.apache.org" <user@spark.apache.org>
Sent: Thursday, January 1, 2015 12:39 PM
Subject: Re: FlatMapValues

thanks let me try that out




From: Hitesh Khamesra <hiteshkh25@gmail.com>
To: Sanjay Subramanian <sanjaysubramanian@yahoo.com>
Cc: Kapil Malik <kmalik@adobe.com>; Sean Owen <sowen@cloudera.com>; "user@spark.apache.org" <user@spark.apache.org>
Sent: Thursday, January 1, 2015 9:46 AM
Subject: Re: FlatMapValues

How about this..apply flatmap on per line. And in that function, parse each line and return all the colums as per your need.



On Wed, Dec 31, 2014 at 10:16 AM, Sanjay Subramanian <sanjaysubramanian@yahoo.com.invalid> wrote:
hey guys

Some of u may care :-) but this is just give u a background with where I am going with this. I have an IOS medical side effects app called MedicalSideFx. I built the entire underlying data layer aggregation using hadoop and currently the search is based on lucene. I am re-architecting the data layer by replacing hadoop with Spark and integrating FDA data, Canadian sidefx data and vaccines sidefx data.     

  
@Kapil , sorry but flatMapValues is being reported as undefined

To give u a complete picture of the code (its inside IntelliJ but thats only for testing....the real code runs on sparkshell on my cluster)


If u were to assume dataset as 

025003,Delirium,8.10,Hypokinesia,8.10,Hypotonia,8.10,,,,
025005,Arthritis,8.10,Injection site oedema,8.10,Injection site reaction,8.10,,,,

This present version of the code, the flatMap works but only gives me values 
Delirium
Hypokinesia
Hypotonia
Arthritis
Injection site oedema
Injection site reaction


What I need is

025003,Delirium
025003,Hypokinesia
025003,Hypotonia
025005,Arthritis
025005,Injection site oedema
025005,Injection site reaction


thanks

sanjay


From: Kapil Malik <kmalik@adobe.com>
To: Sean Owen <sowen@cloudera.com>; Sanjay Subramanian <sanjaysubramanian@yahoo.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Sent: Wednesday, December 31, 2014 9:35 AM
Subject: RE: FlatMapValues

Hi Sanjay,

Oh yes .. on flatMapValues, it's defined in PairRDDFunctions, and you need to import org.apache.spark.rdd.SparkContext._ to use them (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions )

@Sean, yes indeed flatMap / flatMapValues both can be used.

Regards,

Kapil



-----Original Message-----
From: Sean Owen [mailto:sowen@cloudera.com]
Sent: 31 December 2014 21:16
To: Sanjay Subramanian
Cc: user@spark.apache.org
Subject: Re: FlatMapValues

From the clarification below, the problem is that you are calling flatMapValues, which is only available on an RDD of key-value tuples.
Your map function returns a tuple in one case but a String in the other, so your RDD is a bunch of Any, which is not at all what you want. You need to return a tuple in both cases, which is what Kapil pointed out.

However it's still not quite what you want. Your input is basically [key value1 value2 value3] so you want to flatMap that to (key,value1)
(key,value2) (key,value3). flatMapValues does not come into play.

On Wed, Dec 31, 2014 at 3:25 PM, Sanjay Subramanian <sanjaysubramanian@yahoo.com> wrote:
> My understanding is as follows
>
> STEP 1 (This would create a pair RDD)
> =======
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  })
>
> STEP 2
> =======
> Since previous step created a pair RDD, I thought flatMapValues method
> will be applicable.
> But the code does not even compile saying that flatMapValues is not
> applicable to RDD :-(
>
>
> reacRdd.map(line => line.split(',')).map(fields => {
>  if (fields.length >= 11 && !fields(0).contains("VAERS_ID")) {
>
> (fields(0),(fields(1)+"\t"+fields(3)+"\t"+fields(5)+"\t"+fields(7)+"\t"+fields(9)))
>  }
>  else {
>    ""
>  }
>  }).flatMapValues(skus =>
> skus.split('\t')).saveAsTextFile("/data/vaers/msfx/reac/" + outFile)
>
>
> SUMMARY
> =======
> when a dataset looks like the following
>
> 1,red,blue,green
> 2,yellow,violet,pink
>
> I want to output the following and I am asking how do I do that ?
> Perhaps my code is 100% wrong. Please correct me and educate me :-)
>
> 1,red
> 1,blue
> 1,green
> 2,yellow
> 2,violet
> 2,pink


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org