spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boromir Widas <vcsub...@gmail.com>
Subject GroupBy multiple attributes
Date Fri, 23 Jan 2015 16:03:43 GMT
Hello,

I am trying to do a groupBy on 5 attributes to get results in a form like a
pivot table in microsoft excel. The keys are the attribute tuples and
values are double arrays(maybe very large). Based on the code below, I am
getting back correct results, but would like to optimize it further(I
played around with numPartitions).

The two issues I see are -
1. flatMap is needed to expand the key tuples, but this also duplicates the
values, and as the values are large this increases the shuffle input size
for reduceByKey - is there a way to avoid the duplication?
2. reduceByKey is adding two arrays element wise, and creates a new array
for every addition, is there a way to reduce by not creating a new array
everytime(Similar to what accumulators do)?

I am pasting a sample code, query plan and output below.

Thanks.

val attributeToFloatArrayRDD = sc.parallelize(Array(
      ("A-1", "B-2", "C-1", "D-1", "E-1")   -> (0.0 to 1000.0 by 0.25).toArray
      , ("A-2", "B-1", "C-1", "D-2", "E-1") -> (5.0 to 1005.0 by 0.25).toArray
      , ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
      , ("A-3", "B-3", "C-1", "D-1", "E-2") -> (0.0 to 1000.0 by 0.25).toArray
      , ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
      , ("A-4", "B-3", "C-1", "D-1", "E-1") -> (8.0 to 1008.0 by 0.25).toArray
      , ("A-1", "B-1", "C-1", "D-1", "E-1") -> (0.0 to 1000.0 by 0.25).toArray
    ))


    val groupToVaRRDD = attributeToFloatArrayRDD
      .flatMap(x => x._1 match {
        case (t1, t2, t3, t4, t5) => Array((t1+"_top"), (t1, t2), (t1,
t2, t3), (t1, t2, t3, t4), (t1, t2, t3, t4, t5)).map(y => (y, x._2))
      })
      .reduceByKey((x, y) => {
        require(x.size == y.size)
        (x,y).zipped.map(_ + _)
      })
      .map(x => {
        (x._1, x._2.sorted.take(x._2.size/20).last)
      })


==== Query Plan

(16) MappedRDD[12] at map at GroupByTest.scala:81 []

 | ShuffledRDD[11] at reduceByKey at GroupByTest.scala:76 []

 +-(16) FlatMappedRDD[10] at flatMap at GroupByTest.scala:68 []

    | ParallelCollectionRDD[9] at parallelize at GroupByTest.scala:56 []

==== Output


GroupBy 	VaR
(A-2,B-1) 	54.75
(A-2,B-1,C-1,D-2) 	54.75
(A-1,B-1) 	149.25
(A-1,B-1,C-1,D-1,E-1) 	149.25
(A-3,B-3,C-1) 	49.75
(A-3,B-3) 	49.75
(A-4,B-3,C-1,D-1,E-1) 	57.75
(A-2,B-1,C-1) 	54.75
(A-1,B-2,C-1,D-1,E-1) 	49.75
(A-1,B-1,C-1,D-1) 	149.25
(A-3,B-3,C-1,D-1,E-2) 	49.75
(A-1,B-2,C-1) 	49.75
(A-3,B-3,C-1,D-1) 	49.75
(A-4,B-3) 	57.75
(A-1,B-1,C-1) 	149.25
A-1_top 	199.0
(A-4,B-3,C-1,D-1) 	57.75
A-2_top 	54.75
(A-1,B-2) 	49.75
(A-4,B-3,C-1) 	57.75
A-3_top 	49.75
A-4_top 	57.75
(A-2,B-1,C-1,D-2,E-1) 	54.75
(A-1,B-2,C-1,D-1) 	49.75

Mime
View raw message