spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ceriel Jacobs <c.j.h.jac...@vu.nl>
Subject Re: Splitting into partitions and sorting the partitions ... how to do that?
Date Wed, 04 Dec 2013 19:25:19 GMT
Thanks for your answer.
The partitioning function is not that important. What is important that I only sort the partitions,
not the complete RDD. Your suggestion to use rdd.distinct.coalesce(32).mapPartitions(p =>
sorted(p))
sounds nice, and I had indeed seen the coalesce method and the mapPartitions method, but
could (and can) not figure out how to do the (p => sorted(p)) part in Spark/Java.
The only way to sort that I could find in the Spark documentation is sortByKey, but for that
you need
an RDD per partition, I think.

Ceriel Jacobs


On 12/04/2013 08:05 PM, Andrew Ash wrote:
> How important is it that they're partitioned on hashcode() % 32 rather than Spark's default
partitioning?
>
> In scala, you should be able to do this with rdd.distinct.coalesce(32).mapPartitions(p
=> sorted(p))
>
> I'm not sure what your end goal is here, but if it's just sort a bunch of data and remove
duplicates, then that should be
> rdd.distinct.keyBy(_).sortByKey().map( (k,v) => k)
>
>
> On Wed, Dec 4, 2013 at 9:45 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl <mailto:c.j.h.jacobs@vu.nl>>
wrote:
>
>     Thanks for your answer. But the problem is that I only want to sort the 32 partitions,
individually,
>     not the complete input. So yes, the output has to consist of 32 partitions, each
sorted.
>
>     Ceriel Jacobs
>
>
>
>     On 12/04/2013 06:30 PM, Ashish Rangole wrote:
>
>         I am not sure if 32 partitions is a hard limit that you have.
>
>         Unless you have a strong reason to use only 32 partitions, please try providing
the second optional
>         argument (numPartitions) to reduceByKey and sortByKey methods which will paralellize
these Reduce operations.
>         A number 3x the number of total cores on the cluster would be a good value to
try for numPartitions.
>
>         http://spark.incubator.apache.__org/docs/latest/tuning.html#__memory-usage-of-reduce-tasks
>         <http://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks>
>
>         In case you have to have 32 partitions in the final output, you can use coalesce(32)
method on your
>         RDD at the time of final output.
>
>
>         On Wed, Dec 4, 2013 at 3:03 AM, Ceriel Jacobs <c.j.h.jacobs@vu.nl <mailto:c.j.h.jacobs@vu.nl>
<mailto:c.j.h.jacobs@vu.nl
>         <mailto:c.j.h.jacobs@vu.nl>>> wrote:
>
>              Hi,
>
>              I am a novice to SPARK, and need some help with the following problem:
>              I have a
>                       JavaRDD<String> strings;
>              which is potentially large, hundreds of GBs, and I need to split them
>              into 32 partitions, by means of hashcode()%32, and then sort these partitions,
>              and also remove duplicates. I am having trouble finding an efficient way
of
>              expressing this in SPARK. I think I need an RDD to be able to sort, so in
>              this case, I need 32 of them. So I first created an RDD with pairs <partitionNo,
string>,
>              like this:
>
>                       JavaPairRDD<Integer, String> hashStrings = strings
>                               .keyBy(new Function<String, Integer>() {
>                                   @Override
>                                   public Integer call(String s) {
>                                       return new Integer(s.hashCode() % 32);
>                                   }
>                               });
>
>              And then I launch 32 threads that do the following (each thread has its
own partition):
>
>                           // Filter for my own partition
>                           JavaPairRDD<Integer, String> filtered = hashStrings
>                                   .filter(new Function<Tuple2<Integer, String>,
Boolean>() {
>                                       @Override
>                                       public Boolean call(Tuple2<Integer, String>
tpl) {
>                                           return tpl._1 == partition;
>                                       }
>                                   });
>                           JavaRDD<String> values = filtered.values();
>
>                           // Pair with a boolean, so that we can use sortByKey().
>                           JavaPairRDD<String, Boolean> values1 =
>                                   values.map(new PairFunction<String, String, Boolean>()
{
>                                       @Override
>                                       public Tuple2<String, Boolean> call(String
s) {
>                                           return new Tuple2<String, Boolean>(s,
true);
>                                       }
>                                   });
>
>                           // Reduce by key to remove duplicates.
>                           JavaPairRDD<String, Boolean> reduced =
>                                   values1.reduceByKey(
>                                           new Function2<Boolean, Boolean, Boolean>()
{
>                                               @Override
>                                               public Boolean call(Boolean i1,
>                                                       Boolean i2) {
>                                                   // return i1 + i2;
>                                                   return true;
>                                               }
>                                           });
>
>                           // Sort and extract keys.
>                           JavaRDD<String> result = reduced.sortByKey().keys();
>
>              This works for not so large input, but for larger I get all kinds of out-of-memory
>              exceptions. I'm running on 8 nodes, each with 8 cores, and am using SPARK_MEM=16G.
>              I also tried  StorageLevel.MEMORY_AND_DISK() for all the RDDs, but that
just seems to
>              make things much slower, and still gives out-of-memory exceptions.
>
>              Now I'm pretty sure that the way I obtain the partitions is really inefficient,
and I also
>              have my doubts about starting the RDDs in separate threads. So, what would
be the best way
>              to deal with this?
>
>              Thanks in advance for any hints that you can give me.
>
>              Ceriel Jacobs
>
>
>
>


Mime
View raw message