spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DB Tsai <>
Subject Re: Optimizing reduce for 'huge' aggregated outputs.
Date Tue, 10 Jun 2014 06:12:00 GMT
Hi Nick,

How does reduce work? I thought after reducing in the executor, it
will reduce in parallel between multiple executors instead of pulling
everything to driver and reducing there.


DB Tsai
My Blog:

On Mon, Jun 9, 2014 at 11:07 PM, Nick Pentreath
<> wrote:
> Can you key your RDD by some key and use reduceByKey? In fact if you are
> merging bunch of maps you can create a set of (k, v) in your mapPartitions
> and then reduceByKey using some merge function. The reduce will happen in
> parallel on multiple nodes in this case. You'll end up with just a single
> set of k, v per partition which you can reduce or collect and merge on the
> driver.
> —
> Sent from Mailbox
> On Tue, Jun 10, 2014 at 1:05 AM, Sung Hwan Chung <>
> wrote:
>> I suppose what I want is the memory efficiency of toLocalIterator and the
>> speed of collect. Is there any such thing?
>> On Mon, Jun 9, 2014 at 3:19 PM, Sung Hwan Chung <>
>> wrote:
>>> Hello,
>>> I noticed that the final reduce function happens in the driver node with
>>> a code that looks like the following.
>>> val outputMap = mapPartition(domsomething).reduce(a: Map, b: Map) {
>>>  a.merge(b)
>>> }
>>> although individual outputs from mappers are small. Over time the
>>> aggregated result outputMap could be huuuge (say with hundreds of millions
>>> of keys and values, reaching giga bytes).
>>> I noticed that, even if we have a lot of memory in the driver node, this
>>> process becomes reallllly slow eventually (say we have 100+ partitions. the
>>> first reduce is fast, but progressively, it becomes veeery slow as more and
>>> more partition outputs get aggregated). Is this because the intermediate
>>> reduce output gets serialized and then deserialized every time?
>>> What I'd like ideally is, since reduce is taking place in the same
>>> machine any way, there's no need for any serialization and deserialization,
>>> and just aggregate the incoming results into the final aggregation. Is this
>>> possible?

View raw message