spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillaume Pitel <guillaume.pi...@exensa.com>
Subject Forcing RDD computation with something else than count() ?
Date Tue, 21 Jan 2014 09:02:06 GMT
Hi,

I'm struggling a bit with something : I have several datasets 
RDD[((Int,Int),Double)] that I want to merge.

I've tried with union+reduceByKey and cogroup+mapValues, but in all cases it 
seems that if I don't force the computation of the RDD, the final task fails, 
probably because the dataset is too big.

Currently I use count() and persist() to force the computation, but I suspect 
there is a useless overhead when doing so. Is there any other way to force the 
computation ?

Or any advice on that kind of matter ? Numbers : 10 datasets of 200M-400M 
elements, when merged, 2B elements.

My code below :

   def reduceCooccurrences(datasets:List[RDD[(Cooccurrence,Double)]]): RDD[(Cooccurrence,
Double)] = {

     println("Reducing a list of " + datasets.length)

     val result = if (datasets.length == 1)

       datasets(0)

     else if (datasets.length == 2) {

       datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}

       val r = datasets(0).cogroup(datasets(1)).mapValues { case (sda,sdb) =>

         sda.sum + sdb.sum

       }

       datasets.map(_.unpersist())

       r

     }

     else if (datasets.length == 3) {

       datasets.map{ rdd => println("- RDD of " + rdd.count() + " elements")}

       val r = datasets(0).cogroup(datasets(1), datasets(2)).mapValues { case (sda,sdb,sdc)
=>

         sda.sum + sdb.sum + sdc.sum

       }

       datasets.map(_.unpersist())

       r

     } else {

       val (b,e) = datasets.splitAt(datasets.length / 2)

       reduceCooccurrences(b).cogroup(reduceCooccurrences(e)).mapValues { case (sda,sdb) =>

         sda.sum + sdb.sum

       }

     }

     result.persist(StorageLevel.MEMORY_AND_DISK_SER)

     println("Total elements " + result.count())

     result

   }

   def mergeCooccurrences(sc: SparkContext, inputs: List[String], output: String, symmetric:
Boolean, dumpText: Boolean) {

     val nbSets = inputs.length

     val datasets = inputs.map{ input =>

       SparseMatrixIO.load(sc, input).groupByKey().mapValues{

         sd => sd.map(_.get()).sum

       }.persist(StorageLevel.MEMORY_AND_DISK_SER) }

     val dataset = reduceCooccurrences(datasets)

     val result = if (symmetric) {

       dataset.flatMap {

         case c => Seq(c,(new Cooccurrence(c._1.j, c._1.i), c._2))

       }.groupByKey().mapValues{ sd => new DoubleWritable(sd.map(_.get()).sum) }

     } else {

       dataset.mapValues(new DoubleWritable(_))

     }

     SparseMatrixIO.write(output, result, dumpText)

   }


NB :
Cooccurrence <=> (Int,Int)
and SparseMatrixIO.load or wirte basically call newAPIHadoopFile and 
saveAsNewAPIHadoopFile

-- 
eXenSa

	
*Guillaume PITEL, Président*
+33(0)6 25 48 86 80

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05


Mime
View raw message