spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: Reproducing the function of a Hadoop Reducer
Date Sat, 20 Sep 2014 18:09:21 GMT
OK so in Java - pardon the verbosity I might say something like the code
below
but I face the following issues
1) I need to store all values in memory as I run combineByKey - it I could
return an RDD which consumed values that would be great but I don't know
how to do that -
2) In my version of the functions I get a tuple so I know the key but all
of Scala's functtions for byKey do not make the key available - this may
work for a trivial function like wordcount but the code I want to port
needs to know the key when processing values
3) it is important the I have control over partitioning - I can do that
with mapPartition but it is also important that within a partition keys be
received in sorted order - easy if every partition could  a separate RDD -
combined later but I am not sure how that works.

in Hadoop then I reduce the values for each key I get an interator and do
not need to keep all values in memory. Similarly while the output in Hadoop
is written to disk as key values in Spark it could populate a JavaPairRDD
if there were a way to do that lazily

One other issue - I don't see a good way to say a merge function is
finished - i.e. no further data is coming in which would be useful in
processing steps.



    /**
     * a class to store a key and all its values
     *   using an array list
     * @param <K> key type
     * @param <V> value type
     */
    public static class KeyAndValues<K, V> {
        public final K key;
        private final ArrayList<V> values = new ArrayList<V>();
        public KeyAndValues(final K pKey) {
            key = pKey;
        }
         public void addValue(V added) {
            values.add(added);
        }
         public Iterable<V> getIterable() {
            return values;
        }
         public KeyAndValues<K, V> merge(KeyAndValues<K, V> merged) {
            values.addAll(merged.values);
            return this;
        }
    }

     // start function for combine by key - gets key from first tuple
       public static class CombineStartKeyAndValues<K, V> implements
Function<Tuple2<K,V>, KeyAndValues<K, V>> {
          public KeyAndValues call(Tuple2<K,V> x) {
            KeyAndValues ret = new KeyAndValues(x._1());
            ret.addValue(x._2());
            return ret;
        }
    }

    // continue function for combine by key -  adds values to array
     public static class CombineContinueKeyAndValues<K, V> implements
Function2< KeyAndValues< K,V>, Tuple2<K,V>, KeyAndValues<K, V>> {
         public KeyAndValues<K, V> call(final KeyAndValues<K, V> kvs, final
Tuple2<K,V> added) throws Exception {
            kvs.addValue(added._2());
            return kvs;
        }
    }

      // merge function - merges arrays - NOTE there is no signal to say
merge is done
     public static class CombineMergeKeyAndValues<K, V> implements
Function2< KeyAndValues<K, V>,KeyAndValues<K, V>,KeyAndValues<K, V>>
{
          public KeyAndValues<K, V> call(final KeyAndValues<K, V> v1, final
KeyAndValues<K, V> v2) throws Exception {
            return null;
        }
    }

On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen <vtso@paxata.com>
wrote:

> So sorry about teasing you with the Scala. But the method is there in Java
> too, I just checked.
>
> On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <vtso@paxata.com>
> wrote:
>
>> It might not be the same as a real hadoop reducer, but I think it would
>> accomplish the same. Take a look at:
>>
>> import org.apache.spark.SparkContext._
>> // val rdd: RDD[(K, V)]
>> // def zero(value: V): S
>> // def reduce(agg: S, value: V): S
>> // def merge(agg1: S, agg2: S): S
>> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
>> merge)
>> reducedUnsorted.sortByKey()
>>
>> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lordjoe2000@gmail.com>
>> wrote:
>>
>>>  I am struggling to reproduce the functionality of a Hadoop reducer on
>>> Spark (in Java)
>>>
>>> in Hadoop I have a function
>>> public void doReduce(K key, Iterator<V> values)
>>> in Hadoop there is also a consumer (context write) which can be seen as
>>> consume(key,value)
>>>
>>> In my code
>>> 1) knowing the key is important to the function
>>> 2) there is neither one output tuple2 per key nor one output tuple2 per
>>> value
>>> 3) the number of values per key might be large enough that storing them
>>> in memory is impractical
>>> 4) keys must appear in sorted order
>>>
>>> one good example would run through a large document using a similarity
>>> function to look at the last 200 lines and output any of those with a
>>> similarity of more than 0.3 (do not suggest output all and filter - the
>>> real problem is more complex) the critical concern is an uncertain number
>>> of tuples per key.
>>>
>>> my questions
>>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but
>>> I don't see how to create one and add items later
>>>
>>> 2) how do I handle the entire partition, sort, process (involving calls
>>> to doReduce) process
>>>
>>>
>>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
View raw message