spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Splitting into partitions and sorting the partitions ... how to do that?
Date Wed, 04 Dec 2013 19:05:08 GMT
How important is it that they're partitioned on hashcode() % 32 rather than
Spark's default partitioning?

In scala, you should be able to do this with
rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))

I'm not sure what your end goal is here, but if it's just sort a bunch of
data and remove duplicates, then that should be
rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)


On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl> wrote:

> Thanks for your answer. But the problem is that I only want to sort the 32
> partitions, individually,
> not the complete input. So yes, the output has to consist of 32
> partitions, each sorted.
>
> Ceriel Jacobs
>
>
>
> On 12/04/2013 06:30 PM, Ashish Rangole wrote:
>
>> I am not sure if 32 partitions is a hard limit that you have.
>>
>> Unless you have a strong reason to use only 32 partitions, please try
>> providing the second optional
>> argument (numPartitions) to reduceByKey and sortByKey methods which will
>> paralellize these Reduce operations.
>> A number 3x the number of total cores on the cluster would be a good
>> value to try for numPartitions.
>>
>> http://spark.incubator.apache.org/docs/latest/tuning.html#
>> memory-usage-of-reduce-tasks
>>
>> In case you have to have 32 partitions in the final output, you can use
>> coalesce(32) method on your
>> RDD at the time of final output.
>>
>
>  On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl<mailto:
>> c.j.h.jacobs@vu.nl>> wrote:
>>
>>     Hi,
>>
>>     I am a novice to SPARK, and need some help with the following problem:
>>     I have a
>>              JavaRDD<String> strings;
>>     which is potentially large, hundreds of GBs, and I need to split them
>>     into 32 partitions, by means of hashcode()%32, and then sort these
>> partitions,
>>     and also remove duplicates. I am having trouble finding an efficient
>> way of
>>     expressing this in SPARK. I think I need an RDD to be able to sort,
>> so in
>>     this case, I need 32 of them. So I first created an RDD with pairs
>> <partitionNo, string>,
>>     like this:
>>
>>              JavaPairRDD<Integer, String> hashStrings = strings
>>                      .keyBy(new Function<String, Integer>() {
>>                          @Override
>>                          public Integer call(String s) {
>>                              return new Integer(s.hashCode() % 32);
>>                          }
>>                      });
>>
>>     And then I launch 32 threads that do the following (each thread has
>> its own partition):
>>
>>                  // Filter for my own partition
>>                  JavaPairRDD<Integer, String> filtered = hashStrings
>>                          .filter(new Function<Tuple2<Integer, String>,
>> Boolean>() {
>>                              @Override
>>                              public Boolean call(Tuple2<Integer, String>
>> tpl) {
>>                                  return tpl._1 == partition;
>>                              }
>>                          });
>>                  JavaRDD<String> values = filtered.values();
>>
>>                  // Pair with a boolean, so that we can use sortByKey().
>>                  JavaPairRDD<String, Boolean> values1 =
>>                          values.map(new PairFunction<String, String,
>> Boolean>() {
>>                              @Override
>>                              public Tuple2<String, Boolean> call(String
>> s) {
>>                                  return new Tuple2<String, Boolean>(s,
>> true);
>>                              }
>>                          });
>>
>>                  // Reduce by key to remove duplicates.
>>                  JavaPairRDD<String, Boolean> reduced =
>>                          values1.reduceByKey(
>>                                  new Function2<Boolean, Boolean,
>> Boolean>() {
>>                                      @Override
>>                                      public Boolean call(Boolean i1,
>>                                              Boolean i2) {
>>                                          // return i1 + i2;
>>                                          return true;
>>                                      }
>>                                  });
>>
>>                  // Sort and extract keys.
>>                  JavaRDD<String> result = reduced.sortByKey().keys();
>>
>>     This works for not so large input, but for larger I get all kinds of
>> out-of-memory
>>     exceptions. I'm running on 8 nodes, each with 8 cores, and am using
>> SPARK_MEM=16G.
>>     I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but
>> that just seems to
>>     make things much slower, and still gives out-of-memory exceptions.
>>
>>     Now I'm pretty sure that the way I obtain the partitions is really
>> inefficient, and I also
>>     have my doubts about starting the RDDs in separate threads. So, what
>> would be the best way
>>     to deal with this?
>>
>>     Thanks in advance for any hints that you can give me.
>>
>>     Ceriel Jacobs
>>
>>
>>
>

Mime
View raw message