spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tao Xiao <xiaotao.cs....@gmail.com>
Subject A partitionBy problem
Date Wed, 19 Nov 2014 07:06:53 GMT
Hi all,

     I tested *partitionBy *feature in wordcount application, and I'm
puzzled by a phenomenon. In this application, I created an rdd from some
text files in HDFS(about 100GB in size), each of which has lines composed
of words separated by a character "#". I wanted to count the occurence for
each distinct word. *All lines have the same contents so finally the result
should be very small in bytes*.  The code is as follows:

      val text = sc.textFile(inputDir)
      val tuples = text.flatMap(line => line.split("#"))
                                   .map((_, 1))
                                   .reduceByKey(_ + _)
      tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

I submitted the application to a Spark cluster of 5 nodes and run it in
standalone mode. From the application UI
<http://imgbin.org/index.php?page=image&id=20976>, we can see that the
shuffle process for *collect* and *reduceByKey* occupied small bandwidth
(766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle
write).

*However, the shuffle process occupied quite large bandwith when I
added partitionBy like this:*

      val text = sc.textFile(inputDir)
       val tuples = text.flatMap(line => line.split("#"))
                                .map((_, 1))
                                .partitionBy(new HashPartitioner(100))
                                .reduceByKey(_ + _)
      tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

>From the application UI <http://imgbin.org/index.php?page=image&id=20977>,
we can see that the shuffle read for *collect* is 2.8GB and the shuffle
write for *map* is 3.5GB.

The *map* transformations are applied on 5 nodes of the cluster because the
HDFS blocks are distributed among these 5 nodes. The *map*
transformations are applied for each element in the rdd on different nodes
and doesn't need shuffle the new rdd. *So my first question is : why did
the map transformation occupy so large bandwidth(3.5GB) when I added
partitionBy in the codes ?*

When *collect* is applied, is needs to collect the results, namely (*word*,
*totalCount*) tuples from 5 nodes to the driver. That process should occupy
very small bandwidth because all lines have the same contents like
"AAA#BBB#CCC#DDD", which means the final results the *collect*  retrieved
should be very small in bytes(for example hundreds of KB). *So my second
question is : Why did the collect action occupy so large bandwidth(2.8GB)
when I added partitionByKey in the codes ?*

*And the third question : When I added partitionBy for an rdd, it will
return a new rdd. Does that mean the rdd will be immediately shuffled
across nodes to meet the requirement specified by the supplied partitioner,
or will the supplied partitioner merely be a sign indicating how to
partition the rdd later. *

Thanks.

Mime
View raw message