spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Hamstra <m...@clearstorydata.com>
Subject Re: Splitting into partitions and sorting the partitions ... how to do that?
Date Wed, 04 Dec 2013 19:12:56 GMT
You can easily accomplish the hashmod 32 using Spark's partitionBy:

val strings: RDD[String] = ...
strings.map(s => (s, 1)).partitionBy(new HashPartitioner(32)).keys


On Wed, Dec 4, 2013 at 11:05 AM, Andrew Ash <andrew@andrewash.com> wrote:

> How important is it that they're partitioned on hashcode() % 32 rather
> than Spark's default partitioning?
>
> In scala, you should be able to do this with
> rdd.distinct.coalesce(32).mapPartitions(p => sorted(p))
>
> I'm not sure what your end goal is here, but if it's just sort a bunch of
> data and remove duplicates, then that should be
> rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)
>
>
> On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl> wrote:
>
>> Thanks for your answer. But the problem is that I only want to sort the
>> 32 partitions, individually,
>> not the complete input. So yes, the output has to consist of 32
>> partitions, each sorted.
>>
>> Ceriel Jacobs
>>
>>
>>
>> On 12/04/2013 06:30 PM, Ashish Rangole wrote:
>>
>>> I am not sure if 32 partitions is a hard limit that you have.
>>>
>>> Unless you have a strong reason to use only 32 partitions, please try
>>> providing the second optional
>>> argument (numPartitions) to reduceByKey and sortByKey methods which will
>>> paralellize these Reduce operations.
>>> A number 3x the number of total cores on the cluster would be a good
>>> value to try for numPartitions.
>>>
>>> http://spark.incubator.apache.org/docs/latest/tuning.html#
>>> memory-usage-of-reduce-tasks
>>>
>>> In case you have to have 32 partitions in the final output, you can use
>>> coalesce(32) method on your
>>> RDD at the time of final output.
>>>
>>
>>  On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl<mailto:
>>> c.j.h.jacobs@vu.nl>> wrote:
>>>
>>>     Hi,
>>>
>>>     I am a novice to SPARK, and need some help with the following
>>> problem:
>>>     I have a
>>>              JavaRDD<String> strings;
>>>     which is potentially large, hundreds of GBs, and I need to split them
>>>     into 32 partitions, by means of hashcode()%32, and then sort these
>>> partitions,
>>>     and also remove duplicates. I am having trouble finding an efficient
>>> way of
>>>     expressing this in SPARK. I think I need an RDD to be able to sort,
>>> so in
>>>     this case, I need 32 of them. So I first created an RDD with pairs
>>> <partitionNo, string>,
>>>     like this:
>>>
>>>              JavaPairRDD<Integer, String> hashStrings = strings
>>>                      .keyBy(new Function<String, Integer>() {
>>>                          @Override
>>>                          public Integer call(String s) {
>>>                              return new Integer(s.hashCode() % 32);
>>>                          }
>>>                      });
>>>
>>>     And then I launch 32 threads that do the following (each thread has
>>> its own partition):
>>>
>>>                  // Filter for my own partition
>>>                  JavaPairRDD<Integer, String> filtered = hashStrings
>>>                          .filter(new Function<Tuple2<Integer, String>,
>>> Boolean>() {
>>>                              @Override
>>>                              public Boolean call(Tuple2<Integer, String>
>>> tpl) {
>>>                                  return tpl._1 == partition;
>>>                              }
>>>                          });
>>>                  JavaRDD<String> values = filtered.values();
>>>
>>>                  // Pair with a boolean, so that we can use sortByKey().
>>>                  JavaPairRDD<String, Boolean> values1 =
>>>                          values.map(new PairFunction<String, String,
>>> Boolean>() {
>>>                              @Override
>>>                              public Tuple2<String, Boolean> call(String
>>> s) {
>>>                                  return new Tuple2<String, Boolean>(s,
>>> true);
>>>                              }
>>>                          });
>>>
>>>                  // Reduce by key to remove duplicates.
>>>                  JavaPairRDD<String, Boolean> reduced =
>>>                          values1.reduceByKey(
>>>                                  new Function2<Boolean, Boolean,
>>> Boolean>() {
>>>                                      @Override
>>>                                      public Boolean call(Boolean i1,
>>>                                              Boolean i2) {
>>>                                          // return i1 + i2;
>>>                                          return true;
>>>                                      }
>>>                                  });
>>>
>>>                  // Sort and extract keys.
>>>                  JavaRDD<String> result = reduced.sortByKey().keys();
>>>
>>>     This works for not so large input, but for larger I get all kinds of
>>> out-of-memory
>>>     exceptions. I'm running on 8 nodes, each with 8 cores, and am using
>>> SPARK_MEM=16G.
>>>     I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but
>>> that just seems to
>>>     make things much slower, and still gives out-of-memory exceptions.
>>>
>>>     Now I'm pretty sure that the way I obtain the partitions is really
>>> inefficient, and I also
>>>     have my doubts about starting the RDDs in separate threads. So, what
>>> would be the best way
>>>     to deal with this?
>>>
>>>     Thanks in advance for any hints that you can give me.
>>>
>>>     Ceriel Jacobs
>>>
>>>
>>>
>>
>

Mime
View raw message