spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ceriel Jacobs <c.j.h.jac...@vu.nl>
Subject Splitting into partitions and sorting the partitions ... how to do that?
Date Wed, 04 Dec 2013 10:03:27 GMT
Hi,

I am a novice to SPARK, and need some help with the following problem:
I have a
         JavaRDD<String> strings;
which is potentially large, hundreds of GBs, and I need to split them
into 32 partitions, by means of hashcode()%32, and then sort these partitions,
and also remove duplicates. I am having trouble finding an efficient way of
expressing this in SPARK. I think I need an RDD to be able to sort, so in
this case, I need 32 of them. So I first created an RDD with pairs <partitionNo, string>,
like this:

         JavaPairRDD<Integer, String> hashStrings = strings
                 .keyBy(new Function<String, Integer>() {
                     @Override
                     public Integer call(String s) {
                         return new Integer(s.hashCode() % 32);
                     }
                 });

And then I launch 32 threads that do the following (each thread has its own partition):

             // Filter for my own partition
             JavaPairRDD<Integer, String> filtered = hashStrings
                     .filter(new Function<Tuple2<Integer, String>, Boolean>()
{
                         @Override
                         public Boolean call(Tuple2<Integer, String> tpl) {
                             return tpl._1 == partition;
                         }
                     });
             JavaRDD<String> values = filtered.values();

             // Pair with a boolean, so that we can use sortByKey().
             JavaPairRDD<String, Boolean> values1 =
                     values.map(new PairFunction<String, String, Boolean>() {
                         @Override
                         public Tuple2<String, Boolean> call(String s) {
                             return new Tuple2<String, Boolean>(s, true);
                         }
                     });

             // Reduce by key to remove duplicates.
             JavaPairRDD<String, Boolean> reduced =
                     values1.reduceByKey(
                             new Function2<Boolean, Boolean, Boolean>() {
                                 @Override
                                 public Boolean call(Boolean i1,
                                         Boolean i2) {
                                     // return i1 + i2;
                                     return true;
                                 }
                             });

             // Sort and extract keys.
             JavaRDD<String> result = reduced.sortByKey().keys();

This works for not so large input, but for larger I get all kinds of out-of-memory
exceptions. I'm running on 8 nodes, each with 8 cores, and am using SPARK_MEM=16G.
I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but that just seems to
make things much slower, and still gives out-of-memory exceptions.

Now I'm pretty sure that the way I obtain the partitions is really inefficient, and I also
have my doubts about starting the RDDs in separate threads. So, what would be the best way
to deal with this?

Thanks in advance for any hints that you can give me.

Ceriel Jacobs

Mime
View raw message