You can easily accomplish the hashmod 32 using Spark's partitionBy:

val strings: RDD[String] = ...
strings.map(s => (s, 1)).partitionBy(new HashPartitioner(32)).keys


On Wed, Dec 4, 2013 at 11:05 AM, Andrew Ash <andrew@andrewash.com> wrote:
How important is it that they're partitioned on hashcode() % 32 rather than Spark's default partitioning?

In scala, you should be able to do this with rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))

I'm not sure what your end goal is here, but if it's just sort a bunch of data and remove duplicates, then that should be rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)


On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl> wrote:
Thanks for your answer. But the problem is that I only want to sort the 32 partitions, individually,
not the complete input. So yes, the output has to consist of 32 partitions, each sorted.

Ceriel Jacobs



On 12/04/2013 06:30 PM, Ashish Rangole wrote:
I am not sure if 32 partitions is a hard limit that you have.

Unless you have a strong reason to use only 32 partitions, please try providing the second optional
argument (numPartitions) to reduceByKey and sortByKey methods which will paralellize these Reduce operations.
A number 3x the number of total cores on the cluster would be a good value to try for numPartitions.

http://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks

In case you have to have 32 partitions in the final output, you can use coalesce(32) method on your
RDD at the time of final output.

On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl <mailto:c.j.h.jacobs@vu.nl>> wrote:

    Hi,

    I am a novice to SPARK, and need some help with the following problem:
    I have a
             JavaRDD<String> strings;
    which is potentially large, hundreds of GBs, and I need to split them
    into 32 partitions, by means of hashcode()%32, and then sort these partitions,
    and also remove duplicates. I am having trouble finding an efficient way of
    expressing this in SPARK. I think I need an RDD to be able to sort, so in
    this case, I need 32 of them. So I first created an RDD with pairs <partitionNo, string>,
    like this:

             JavaPairRDD<Integer, String> hashStrings = strings
                     .keyBy(new Function<String, Integer>() {
                         @Override
                         public Integer call(String s) {
                             return new Integer(s.hashCode() % 32);
                         }
                     });

    And then I launch 32 threads that do the following (each thread has its own partition):

                 // Filter for my own partition
                 JavaPairRDD<Integer, String> filtered = hashStrings
                         .filter(new Function<Tuple2<Integer, String>, Boolean>() {
                             @Override
                             public Boolean call(Tuple2<Integer, String> tpl) {
                                 return tpl._1 == partition;
                             }
                         });
                 JavaRDD<String> values = filtered.values();

                 // Pair with a boolean, so that we can use sortByKey().
                 JavaPairRDD<String, Boolean> values1 =
                         values.map(new PairFunction<String, String, Boolean>() {
                             @Override
                             public Tuple2<String, Boolean> call(String s) {
                                 return new Tuple2<String, Boolean>(s, true);
                             }
                         });

                 // Reduce by key to remove duplicates.
                 JavaPairRDD<String, Boolean> reduced =
                         values1.reduceByKey(
                                 new Function2<Boolean, Boolean, Boolean>() {
                                     @Override
                                     public Boolean call(Boolean i1,
                                             Boolean i2) {
                                         // return i1 + i2;
                                         return true;
                                     }
                                 });

                 // Sort and extract keys.
                 JavaRDD<String> result = reduced.sortByKey().keys();

    This works for not so large input, but for larger I get all kinds of out-of-memory
    exceptions. I'm running on 8 nodes, each with 8 cores, and am using SPARK_MEM=16G.
    I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but that just seems to
    make things much slower, and still gives out-of-memory exceptions.

    Now I'm pretty sure that the way I obtain the partitions is really inefficient, and I also
    have my doubts about starting the RDDs in separate threads. So, what would be the best way
    to deal with this?

    Thanks in advance for any hints that you can give me.

    Ceriel Jacobs