spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mridul Muralidharan <>
Subject Re: Eliminate copy while sending data : any Akka experts here ?
Date Tue, 01 Jul 2014 09:51:24 GMT
We had considered both approaches (if I understood the suggestions right) :
a) Pulling only map output states for tasks which run on the reducer
by modifying the Actor. (Probably along lines of what Aaron described
The performance implication of this was bad :
1) We cant cache serialized result anymore, (caching it makes no sense rather).
2) The number requests to master will go from num_executors to
num_reducers - the latter can be orders of magnitude higher than

b) Instead of pulling this information, push it to executors as part
of task submission. (What Patrick mentioned ?)
(1) a.1 from above is still an issue for this.
(2) Serialized task size is also a concern : we have already seen
users hitting akka limits for task size - this will be an additional
vector which might exacerbate it.
Our jobs are not hitting this yet though !

I was hoping there might be something in akka itself to alleviate this
- but if not, we can solve it within context of spark.

Currently, we have worked around it by using broadcast variable when
serialized size is above some threshold - so that our immediate
concerns are unblocked :-)
But a better solution should be greatly welcomed !
Maybe we can unify it with large serialized task as well ...

Btw, I am not sure what the higher cost of BlockManager referred to is
Aaron - do you mean the cost of persisting the serialized map outputs
to disk ?


On Tue, Jul 1, 2014 at 1:36 PM, Patrick Wendell <> wrote:
> Yeah I created a JIRA a while back to piggy-back the map status info
> on top of the task (I honestly think it will be a small change). There
> isn't a good reason to broadcast the entire array and it can be an
> issue during large shuffles.
> - Patrick
> On Mon, Jun 30, 2014 at 7:58 PM, Aaron Davidson <> wrote:
>> I don't know of any way to avoid Akka doing a copy, but I would like to
>> mention that it's on the priority list to piggy-back only the map statuses
>> relevant to a particular map task on the task itself, thus reducing the
>> total amount of data sent over the wire by a factor of N for N physical
>> machines in your cluster. Ideally we would also avoid Akka entirely when
>> sending the tasks, as these can get somewhat large and Akka doesn't work
>> well with large messages.
>> Do note that your solution of using broadcast to send the map tasks is very
>> similar to how the executor returns the result of a task when it's too big
>> for akka. We were thinking of refactoring this too, as using the block
>> manager has much higher latency than a direct TCP send.
>> On Mon, Jun 30, 2014 at 12:13 PM, Mridul Muralidharan <>
>> wrote:
>>> Our current hack is to use Broadcast variables when serialized
>>> statuses are above some (configurable) size : and have the workers
>>> directly pull them from master.
>>> This is a workaround : so would be great if there was a
>>> better/principled solution.
>>> Please note that the responses are going to different workers
>>> requesting for the output statuses for shuffle (after map) - so not
>>> sure if back pressure buffers, etc would help.
>>> Regards,
>>> Mridul
>>> On Mon, Jun 30, 2014 at 11:07 PM, Mridul Muralidharan <>
>>> wrote:
>>> > Hi,
>>> >
>>> >   While sending map output tracker result, the same serialized byte
>>> > array is sent multiple times - but the akka implementation copies it
>>> > to a private byte array within ByteString for each send.
>>> > Caching a ByteString instead of Array[Byte] did not help, since akka
>>> > does not support special casing ByteString : serializes the
>>> > ByteString, and copies the result out to an array before creating
>>> > ByteString out of it (in Array[Byte] serializing is thankfully simply
>>> > returning same array - so one copy only).
>>> >
>>> >
>>> > Given the need to send immutable data large number of times, is there
>>> > any way to do it in akka without copying internally in akka ?
>>> >
>>> >
>>> > To see how expensive it is, for 200 nodes withi large number of
>>> > mappers and reducers, the status becomes something like 30 mb for us -
>>> > and pulling this about 200 to 300 times results in OOM due to the
>>> > large number of copies sent out.
>>> >
>>> >
>>> > Thanks,
>>> > Mridul

View raw message