spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacob Maloney <jmalo...@conversantmedia.com>
Subject Strange duplicates in data when scaling up
Date Thu, 16 Oct 2014 22:09:25 GMT
I have a flatmap function that shouldn't possibly emit duplicates and yet it does. The output
of my function is a HashSet so the function itself cannot output duplicates and yet I see
many copies of keys emmited from it (in one case up to 62). The curious thing is I can't get
this to happen until I ramp up the size of the input lines to about 100,000. For example:
(350000000087005221,[[(80530632,0.20824391739360665)], [(80530632,0.20824391739360665)]])

Will expand to
(350000000087005221,(80530632,0.37312230565577803))
(350000000087005221,(80530632,0.37312230565577803))
(350000000087005221,(80530632,0.37312230565577803))
.
.
.
(350000000087005221,(80530632,0.37312230565577803))
62 total times

If I run this line only as input I only get the one line of output as expected. It seems to
be a scaling up issue.

My code is as follows:
JavaPairRDD<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> preAggData
= indidKeyedJoinedData.groupByKey();

JavaPairRDD<Long,Tuple2<Integer,Double>> aggregatedData = preAggData.flatMapToPair(new
AggregateLikeSims());

Where:
                static class AggregateLikeSims implements PairFlatMapFunction<Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>>,
Long,Tuple2<Integer,Double>>{
                                HashSet<Tuple2<Long, Tuple2<Integer, Double>>>
output = new HashSet<Tuple2<Long, Tuple2<Integer, Double>>>();
                                Map<Integer,List<Double>> intermediateMap = new
HashMap<Integer,List<Double>>();
                                Iterator<Tuple2<Integer,Double>> intIterator;
                                Tuple2<Integer,Double> currentTuple;
                                Double MAX_RECO_VALUE = 1.0;
                                Iterator<Iterable<Tuple2<Integer,Double>>>
itIterator;
                                Accumulator<Integer> accum;

                                @Override
                                public Iterable<Tuple2<Long, Tuple2<Integer, Double>>>
call(Tuple2<Long,Iterable<Iterable<Tuple2<Integer,Double>>>> inTuple){
                                                itIterator = inTuple._2.iterator();

                                                while(itIterator.hasNext()){
                                                                intIterator = itIterator.next().iterator();
                                                                while(intIterator.hasNext()){
                                                                                currentTuple
= intIterator.next();
                                                                                if (intermediateMap.containsKey(currentTuple._1)){
                                                                                         
      intermediateMap.get(currentTuple._1).add(currentTuple._2);
                                                                                } else {
                                                                                         
      List<Double> listOfDoubles = new ArrayList<Double>();
                                                                                         
      listOfDoubles.add(currentTuple._2);
                                                                                         
      intermediateMap.put(currentTuple._1, listOfDoubles);
                                                                                }
                                                                }
                                                }

                                                Iterator<Map.Entry<Integer,List<Double>>>
it = intermediateMap.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Map.Entry<Integer,List<Double>> pairs
= it.next();
                                        if (pairs.getValue().size() > 1) {
                                                output.add(new Tuple2<Long, Tuple2<Integer,
Double>>(inTuple._1,new Tuple2<Integer,Double>(pairs.getKey(),aggregate(pairs.getValue()))));
                                        } else {
                                                output.add(new Tuple2<Long, Tuple2<Integer,
Double>>(inTuple._1,new Tuple2<Integer,Double>(pairs.getKey(),pairs.getValue().get(0))));
                                        }
                                        it.remove();
                                    }

                                    return output;
                                }

                                private double aggregate(List<Double> simsList) {
                                                if (simsList == null) {
                                                                return 0;
                                                }
                                                if (simsList.size() == 1) {
                                                                return simsList.get(0);
                                                }

                                                double max = 0;
                                                double sum = 0;

                                                // Find max and sum up all elements of array.
                                                for (double sim : simsList) {
                                                                if (sim > max) {
                                                                                max = sim;
                                                                }
                                                                if (sim != Double.NaN) {
                                                                                sum = sum
+ sim;
                                                                }
                                                }
                                                sum = sum - max;
                                                double agr = max + (MAX_RECO_VALUE - max)
* sum / (double)((simsList.size() - 1) * MAX_RECO_VALUE);

                                                return agr;
                                }

                                public void setAccum(Accumulator<Integer> in){
                                                accum = in;
                                }
                }




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.

Mime
View raw message