On Wed, Jun 4, 2014 at 5:56 AM, Amit Kumar <kumaramit01@gmail.com> wrote:

Hi Folks,

I am new to spark -and this is probably a basic question.

I have a file on the hdfs

1, one
1, uno
2, two
2, dos

I want to create a multi Map RDD  RDD[Map[String,List[String]]]

{"1"->["one","uno"], "2"->["two","dos"]}

Actually what you described is not a “multi-map RDD”, the type of this RDD should be something like RDD[(String, List[String]]. RDD[Map[String, List[String]]] indicates that each element within this RDD is itself a Map[String, List[String]], and I don’t think this is what you want according to the context.

First I read the file 
val identityData:RDD[String] = sc.textFile($path_to_the_file, 2).cache()

You don’t need to call .cache() here since identityData is used only once, so the cached data won’t be used anywhere.

val identityDataList:RDD[List[String]]=
      identityData.map{ line =>
        val splits= line.split(",")

Turn the text line into a (String, String) pair would be much more useful since then you can call functions like groupByKey, which are defined in PairRDDFunctions:

val identityDataPairs: RDD[(String, String)] = identityData.map { line =>
  val Array(key, value) = line.split(",")
  key -> value

Then I group them by the first element

 val grouped:RDD[(String,Iterable[List[String]])]=
      element =>{

Using groupByKey on pair RDDs is more convenient as mentioned above:

val grouped: RDD[(String, Iterable[String])] = identityDataPairs.groupByKey()

Then I do the equivalent of mapValues of scala collections to get rid of the first element

 val groupedWithValues:RDD[(String,List[String])] =
    grouped.flatMap[(String,List[String])]{ case (key,list)=>{
      List((key,list.map{element => {

for this to actually materialize I do collect

 val groupedAndCollected=groupedWithValues.collect()

I get an Array[String,List[String]].

I am trying to figure out if there is a way for me to get Map[String,List[String]] (a multimap), or to create an RDD[Map[String,List[String]] ]

To get a Map[String, Iterable[String]], you may simply call collectAsMap which is only defined on pair RDDs:

val groupedAndCollected = grouped.collectAsMap()

I am sure there is something simpler, I would appreciate advice.

Many thanks,

At last, be careful if you are processing large volume of data, since groupByKey is an expensive transformation, and collecting all the data to driver side may simply cause OOM if the data can’t fit in the driver node.