spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joseph Bradley <jos...@databricks.com>
Subject Re: Decrease shuffle in TreeAggregate with coalesce ?
Date Wed, 27 Apr 2016 17:41:48 GMT
Do you have code which can reproduce this performance drop in treeReduce?
It would be helpful to debug.  In the 1.6 release, we profiled it via the
various MLlib algorithms and did not see performance drops.

It's not just renumbering the partitions; it is reducing the number of
partitions by a factor of 1.0/scale (where scale > 1).  This creates a
"tree"-structured aggregation so that more of the work of merging during
aggregation is done on the workers, not the driver.

On Wed, Apr 27, 2016 at 4:46 AM, Guillaume Pitel <guillaume.pitel@exensa.com
> wrote:

> Hi,
>
> I've been looking at the code of RDD.treeAggregate, because we've seen a
> huge performance drop between 1.5.2 and 1.6.1 on a treeReduce. I think the
> treeAggregate code hasn't changed, so my message is not about the
> performance drop, but a more general remark about treeAggregate.
>
> In treeAggregate, after the aggregate is applied inside original
> partitions, we enter the tree :
>
>
> while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale))
> {
>
> numPartitions /= scale
>
> val curNumPartitions = numPartitions
>
> * partiallyAggregated **=** partiallyAggregated.mapPartitionsWithIndex {*
>
> * (i, iter) **=>** iter.map((i **%** curNumPartitions, _))*
>
> }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
>
> }
>
>
> The two lines where the partitions are numbered then renumbered, then
> reducedByKey seems below optimality to me. There is a huge shuffle cost,
> while a simple coalesce followed by a partition-level aggregation would
> probably perfectly do the job.
>
> Have I missed something that requires to do this reshuffle ?
>
> Best regards
> Guillaume Pitel
>

Mime
View raw message