spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Victor Tso-Guillen <v...@paxata.com>
Subject Re: Reproducing the function of a Hadoop Reducer
Date Sun, 21 Sep 2014 04:32:51 GMT
   1. Actually, I disagree that combineByKey requires that all values be
   held in memory for a key. Only the use case groupByKey does that, whereas
   reduceByKey, foldByKey, and the generic combineByKey do not necessarily
   make that requirement. If your combine logic really shrinks the result
   value by a lot, I think it would be worth it to make sure mapSideCombine is
   true.
   2. In order to get the key into the combine logic, you may need to
   project it into a (K, (K, V)). I'm not sure there's a method that otherwise
   provides the information you're asking for. Unfortunately, that is a lot
   more heavyweight.
   3. If you absolutely need the keys in sorted order before you combine,
   then perhaps you could sortByKey before doing your combineByKey, but you
   pay the price of a bigger shuffle doing it that way.

I hope that helps. If not, perhaps you can sketch out in more detail what
you're trying to accomplish and I or someone else can guide you through.

Cheers,
Y

On Sat, Sep 20, 2014 at 11:09 AM, Steve Lewis <lordjoe2000@gmail.com> wrote:

> OK so in Java - pardon the verbosity I might say something like the code
> below
> but I face the following issues
> 1) I need to store all values in memory as I run combineByKey - it I could
> return an RDD which consumed values that would be great but I don't know
> how to do that -
> 2) In my version of the functions I get a tuple so I know the key but all
> of Scala's functtions for byKey do not make the key available - this may
> work for a trivial function like wordcount but the code I want to port
> needs to know the key when processing values
> 3) it is important the I have control over partitioning - I can do that
> with mapPartition but it is also important that within a partition keys be
> received in sorted order - easy if every partition could  a separate RDD -
> combined later but I am not sure how that works.
>
> in Hadoop then I reduce the values for each key I get an interator and do
> not need to keep all values in memory. Similarly while the output in Hadoop
> is written to disk as key values in Spark it could populate a JavaPairRDD
> if there were a way to do that lazily
>
> One other issue - I don't see a good way to say a merge function is
> finished - i.e. no further data is coming in which would be useful in
> processing steps.
>
>
>
>     /**
>      * a class to store a key and all its values
>      *   using an array list
>      * @param <K> key type
>      * @param <V> value type
>      */
>     public static class KeyAndValues<K, V> {
>         public final K key;
>         private final ArrayList<V> values = new ArrayList<V>();
>         public KeyAndValues(final K pKey) {
>             key = pKey;
>         }
>          public void addValue(V added) {
>             values.add(added);
>         }
>          public Iterable<V> getIterable() {
>             return values;
>         }
>          public KeyAndValues<K, V> merge(KeyAndValues<K, V> merged) {
>             values.addAll(merged.values);
>             return this;
>         }
>     }
>
>      // start function for combine by key - gets key from first tuple
>        public static class CombineStartKeyAndValues<K, V> implements
> Function<Tuple2<K,V>, KeyAndValues<K, V>> {
>           public KeyAndValues call(Tuple2<K,V> x) {
>             KeyAndValues ret = new KeyAndValues(x._1());
>             ret.addValue(x._2());
>             return ret;
>         }
>     }
>
>     // continue function for combine by key -  adds values to array
>      public static class CombineContinueKeyAndValues<K, V> implements
> Function2< KeyAndValues< K,V>, Tuple2<K,V>, KeyAndValues<K, V>>
{
>          public KeyAndValues<K, V> call(final KeyAndValues<K, V> kvs,
> final Tuple2<K,V> added) throws Exception {
>             kvs.addValue(added._2());
>             return kvs;
>         }
>     }
>
>       // merge function - merges arrays - NOTE there is no signal to say
> merge is done
>      public static class CombineMergeKeyAndValues<K, V> implements
> Function2< KeyAndValues<K, V>,KeyAndValues<K, V>,KeyAndValues<K, V>>
{
>           public KeyAndValues<K, V> call(final KeyAndValues<K, V> v1,
> final KeyAndValues<K, V> v2) throws Exception {
>             return null;
>         }
>     }
>
> On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen <vtso@paxata.com>
> wrote:
>
>> So sorry about teasing you with the Scala. But the method is there in
>> Java too, I just checked.
>>
>> On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen <vtso@paxata.com>
>> wrote:
>>
>>> It might not be the same as a real hadoop reducer, but I think it would
>>> accomplish the same. Take a look at:
>>>
>>> import org.apache.spark.SparkContext._
>>> // val rdd: RDD[(K, V)]
>>> // def zero(value: V): S
>>> // def reduce(agg: S, value: V): S
>>> // def merge(agg1: S, agg2: S): S
>>> val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
>>> merge)
>>> reducedUnsorted.sortByKey()
>>>
>>> On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis <lordjoe2000@gmail.com>
>>> wrote:
>>>
>>>>  I am struggling to reproduce the functionality of a Hadoop reducer on
>>>> Spark (in Java)
>>>>
>>>> in Hadoop I have a function
>>>> public void doReduce(K key, Iterator<V> values)
>>>> in Hadoop there is also a consumer (context write) which can be seen as
>>>> consume(key,value)
>>>>
>>>> In my code
>>>> 1) knowing the key is important to the function
>>>> 2) there is neither one output tuple2 per key nor one output tuple2 per
>>>> value
>>>> 3) the number of values per key might be large enough that storing them
>>>> in memory is impractical
>>>> 4) keys must appear in sorted order
>>>>
>>>> one good example would run through a large document using a similarity
>>>> function to look at the last 200 lines and output any of those with a
>>>> similarity of more than 0.3 (do not suggest output all and filter - the
>>>> real problem is more complex) the critical concern is an uncertain number
>>>> of tuples per key.
>>>>
>>>> my questions
>>>> 1) how can this be done - ideally a consumer would be a JavaPairRDD but
>>>> I don't see how to create one and add items later
>>>>
>>>> 2) how do I handle the entire partition, sort, process (involving calls
>>>> to doReduce) process
>>>>
>>>>
>>>>
>>>
>>>
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>

Mime
View raw message