spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sriram Ramachandrasekaran <sri.ram...@gmail.com>
Subject Re: executor failures w/ scala 2.10
Date Fri, 01 Nov 2013 04:58:59 GMT
Sorry if I my understanding is wrong. May be, for this particular case it
might be something to do with the load/network, but, in general, are you
saying that, we build these communication channels(block manager
communication, task events communication, etc) assuming akka would take
care of it? I somehow feel that, it's being overly optimistic. Correct me
if I am wrong.



On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:

> It’s true that Akka’s delivery guarantees are in general at-most-once, but
> if you look at the text there it says that they differ by transport. In the
> previous version, I’m quite sure that except maybe in very rare
> circumstances or cases where we had a bug, Akka’s remote layer always kept
> connections up between each pair of hosts. So the guarantee was that as
> long as you haven’t received a “disconnected” event, your messages are
> being delivered, though of course when you do receive that event you don’t
> know which messages have really made it through unless you acked them. But
> that didn’t matter for our use case — from our point of view an executor
> was either up or down.
>
> For this reason I still think it should be possible to configure Akka to
> do the same on 2.2. Most likely some timeouts just got lower. With large
> heaps you can easily get a GC pause of 60 seconds, so these timeouts should
> be in the minutes.
>
> If for some reason this isn’t the case, then we have a bigger problem —
> there are *lots* of messages beyond task-finished that need to be sent
> reliably, including things like block manager events (a block was added /
> removed on this node) and commands to tell the block manager to drop data.
> It would be silly to implement acks at the application level for all these.
> But I doubt this is the case. Prashant’s observation that the standalone
> cluster manager stayed up is a further sign that this might be due to GC.
>
> Matei
>
> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran <
> sri.rams85@gmail.com> wrote:
>
> Hi Imran,
> Just to add, we've noticed dis-associations in a couple projects that we
> built(using akka 2.2.x not spark). We went to some details to find out what
> was happening. As Matei, suggested, Akka keeps the TCP connection open and
> uses that to talk to peers. We noticed that in our case, initially, we were
> seeing dis-associations generally at the end of keep-alive duration. So,
> when the keep-alive duration ends, at the TCP layer, a keep-alive probe
> gets sent to inform the peer on the other side that the connection is still
> alive/valid. For some reason, the probe dint renew the keep-alive
> connection and we saw a lot of dis-associations during that time. Later, we
> realized this was not a pattern either. This thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains
the full history of our discussions with the Akka team. It's still
> open and unclear as to what was causing it for our case.
> We tried tweaking various settings of akka(wrt heartbeats, failure
> detector, even plugged-in our own failure detector with no effect).
>
> Imran - Just to clarify your point on message delivery - akka's message
> delivery policy is at-most-once. However, there's no guarantee for a
> message to be delivered to a peer. The documentation clearly explains that.
> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. It's
> the responsibility of the application developer to handle cases where
> message is suspected to be not have been delivered.
>
> I hope this helps.
>
>
>
>
> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <imran@quantifind.com> wrote:
>
>>
>> unfortunately that change wasn't the silver bullet I was hoping for.
>> Even with
>> 1) ignoring DisassociatedEvent
>> 2) executor uses ReliableProxy to send messages back to driver
>> 3) turn up akka.remote.watch-failure-detector.threshold=12
>>
>>
>> there is a lot of weird behavior.  First, there are a few
>> DisassociatedEvents, but some that are followed by AssociatedEvents, so
>> that seems ok.  But sometimes the re-associations are immediately followed
>> by this:
>>
>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got
>> lifecycleevent: AssociationError [akka.tcp://sparkExecutor@<executor>:41441]
>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address:
>> akka.tcp://spark@<driver>:41321] [
>> akka.remote.InvalidAssociation: Invalid address:
>> akka.tcp://spark@<driver>:41321
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
>> The remote system has quarantined this system. No further associations to
>> the remote system are possible until this system is restarted.
>> ]
>>
>> On the driver, there are messages like:
>>
>> [INFO] [10/31/2013 18:51:07.838] [spark-akka.actor.default-dispatcher-3]
>> [Remoting] Address [akka.tcp://sparkExecutor@<executor>:46123] is now
>> quarantined, all messages to this address will be delivered to dead letters.
>> [WARN] [10/31/2013 18:51:10.845] [spark-akka.actor.default-dispatcher-20]
>> [akka://spark/system/remote-watcher] Detected unreachable:
>> [akka.tcp://sparkExecutor@<executor>:41441]
>>
>>
>> and when the driver does decide that the executor has been terminated, it
>> removes the executor, but doesn't start another one.
>>
>> there are a ton of messages also about messages to the block manager
>> master ... I'm wondering if there are other parts of the system that need
>> to use a reliable proxy (or some sort of acknowledgement).
>>
>> I really don't think this was working properly even w/ previous versions
>> of spark / akka.  I'm still learning about akka, but I think you always
>> need an ack to be confident w/ remote communicate.  Perhaps the old version
>> of akka just had more robust defaults or something, but I bet it could
>> still have the same problems.  Even before, I have seen the driver thinking
>> there were running tasks, but nothing happening on any executor -- it was
>> just rare enough (and hard to reproduce) that I never bothered looking into
>> it more.
>>
>> I will keep digging ...
>>
>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>>
>>> BTW the problem might be the Akka failure detector settings that seem
>>> new in 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
>>>
>>> Their timeouts seem pretty aggressive by default — around 10 seconds.
>>> This can easily be too little if you have large garbage collections. We
>>> should make sure they are higher than our own node failure detection
>>> timeouts.
>>>
>>> Matei
>>>
>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <imran@quantifind.com> wrote:
>>>
>>> pretty sure I found the problem -- two problems actually.  And I think
>>> one of them has been a general lurking problem w/ spark for a while.
>>>
>>> 1)  we should ignore disassociation events, as you suggested earlier.
>>> They seem to just indicate a temporary problem, and can generally be
>>> ignored.  I've found that they're regularly followed by AssociatedEvents,
>>> and it seems communication really works fine at that point.
>>>
>>> 2) Task finished messages get lost.  When this message gets sent, we
>>> dont' know it actually gets there:
>>>
>>>
>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>>>
>>> (this is so incredible, I feel I must be overlooking something -- but
>>> there is no ack somewhere else that I'm overlooking, is there??)  So, after
>>> the patch, spark wasn't hanging b/c of the unhandled DisassociatedEvent.
>>> It hangs b/c the executor has sent some taskFinished messages that never
>>> get received by the driver.  So the driver is waiting for some tasks to
>>> finish, but the executors think they are all done.
>>>
>>> I'm gonna add the reliable proxy pattern for this particular interaction
>>> and see if its fixes the problem
>>>
>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>>>
>>> imran
>>>
>>>
>
>
> --
> It's just about how deep your longing is!
>
>
>


-- 
It's just about how deep your longing is!

Mime
View raw message