spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <im...@quantifind.com>
Subject Re: executor failures w/ scala 2.10
Date Fri, 01 Nov 2013 20:09:20 GMT
I downgraded spark to akka 2.1.0, and everything seems to work now.  I'm
going to run my tests a few more times , but I'd really have expected to
see a failure by now w/ the 2.2.3 version.

I'll submit a patch shortly (need to fix some compile errors in streaming
still).

Matei -- I think I realize now that when you were talking about the
expectation of a tcp connection staying alive, you were explaining why this
is *not* a bug in the current release.  You wouldn't end up in a situation
where the executor thinks it finished the task, but the driver doesn't know
about it, b/c if the connection dies, the executor wil get restarted.  That
makes sense.  But, it seems like if we upgrade to akka 2.2.x, a lot of
things change.  I was probably wrong about seeing that problem in previous
releases -- it was just a vague recollection, which fit my current
theories, so I jumped to conclusions.

thanks everyone



On Fri, Nov 1, 2013 at 9:27 AM, Imran Rashid <imran@quantifind.com> wrote:

> thanks everyone for all of the input.
>
> Matei: makes a lot more sense with your explanation of spark's expected
> behavior of tcp, I can see why this makes sense now.  But, to show my total
> ignorance here, I'm wondering that when the connection does break, are you
> sure all of your messages that you thought you sent before the break were
> received?  I'm guessing that you don't.  Which is fine, if the response to
> that is to have the executor just die completely, and restart.  that was
> the behavior I was initially observing with the code on the 2.10 branch,
> where the executor handles a DisassociatedEvent explicitly, and dies.
>
> But -- is that the behavior we want?  do we want it to be robust to tcp
> connections breaking, without having to completely restart the executor?
> you might say that dying & restarting will lead to correct behavior, even
> if its inefficient.  But sometimes, I've seen restarts so frequently that
> no progress is made.
>
> I don't see why this changed w/ the different versions of akka -- I don't
> see any relevant configuration settings that would change how "strongly"
> tcp tries to keep the connection alive, but I may be missing something.
> But it does seem like the netty configuration options have changed
> completely between the two versions:
> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#Remote_Configuration
> vs
> http://doc.akka.io/docs/akka/2.0.5/scala/remoting.html
>
> btw, akka 2.1.0 also has been built for scala 2.10:
>
> http://search.maven.org/#artifactdetails|com.typesafe.akka|akka-remote_2.10|2.1.0|bundle
> and its netty configuration is closer to 2.0.5:
> http://doc.akka.io/docs/akka/2.1.0/scala/remoting.html
>
> perhaps someone more knowledge then me about netty & tcp can look through
> the changes and decide what the right changes are.
>
> Prashant said:
> >Before we conclude something about reliable messaging, I want you to for
> once consider other possibilities like >actual network reconnection and may
> be a GC pause ? Try connecting something like jconsole (or alike ) and >see
> what happens on the driver and executor.
> >
> >My doubt are since we are using standalone mode where even master and
> worker are also actors then if we see >a weird behaviour on the executor
> and driver then Why not on master and worker too ? They should also break
> >away from each other. For this reason, I am doubting our conclusions and
> may be if we narrow down the >problem first before we conclude something.
> It is a regression in akka 2.2.3 it uses more memory than it used to >be in
> 2.1.x.
> >See https://github.com/akka/akka/issues/1810
>
>
> Well, there could easily be the same problem with dropped connections
> between master & worker -- they just communicate so little, it doesn't
> really matter.  The odds that a message gets dropped between them is very
> low, only because there are barely any messages.
>
> I completely agree that the problem could be because of a contention, or
> gc pause, etc.  In fact, I'm only giving spark 24 out of 32 cores available
> on each box, and 90g out of 125g memory.  I've looked at gc a little with
> jstat, and I did see some gc pauses but nothing ridiculous.
>
> But, I think the question remains.  Suppose it is gc pauses, etc. that
> cause the disassociation events; what do we do to fix it?  How can we
> diagnose the problem, and figure out which of the configuration variables
> to tune?  clearly, there *will be* long gc pauses, and the networking layer
> needs to be able to deal with them.
>
> still I understand your desire to see if that might be the cause of the
> problem in this particular case, so I will dig a little more.
>
>
> (btw, should I move this thread to the dev list now?  it is getting into
> the nitty-gritty of implementation ...)
>
> On Fri, Nov 1, 2013 at 1:15 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>
>> Yes, so far they’ve been built on that assumption — not that Akka would
>> *guarantee* delivery in that as soon as the send() call returns you know
>> it’s delivered, but that Akka would act the same way as a TCP socket,
>> allowing you to send a stream of messages in order and hear when the
>> connection breaks. Maybe that isn’t what they want to provide, but I'd find
>> it weird, because it’s very easy to write a server with this property.
>>
>> Matei
>>
>> On Oct 31, 2013, at 9:58 PM, Sriram Ramachandrasekaran <
>> sri.rams85@gmail.com> wrote:
>>
>> Sorry if I my understanding is wrong. May be, for this particular case it
>> might be something to do with the load/network, but, in general, are you
>> saying that, we build these communication channels(block manager
>> communication, task events communication, etc) assuming akka would take
>> care of it? I somehow feel that, it's being overly optimistic. Correct me
>> if I am wrong.
>>
>>
>>
>> On Fri, Nov 1, 2013 at 10:08 AM, Matei Zaharia <matei.zaharia@gmail.com>wrote:
>>
>>> It’s true that Akka’s delivery guarantees are in general at-most-once,
>>> but if you look at the text there it says that they differ by transport. In
>>> the previous version, I’m quite sure that except maybe in very rare
>>> circumstances or cases where we had a bug, Akka’s remote layer always kept
>>> connections up between each pair of hosts. So the guarantee was that as
>>> long as you haven’t received a “disconnected” event, your messages are
>>> being delivered, though of course when you do receive that event you don’t
>>> know which messages have really made it through unless you acked them. But
>>> that didn’t matter for our use case — from our point of view an executor
>>> was either up or down.
>>>
>>> For this reason I still think it should be possible to configure Akka to
>>> do the same on 2.2. Most likely some timeouts just got lower. With large
>>> heaps you can easily get a GC pause of 60 seconds, so these timeouts should
>>> be in the minutes.
>>>
>>> If for some reason this isn’t the case, then we have a bigger problem —
>>> there are *lots* of messages beyond task-finished that need to be sent
>>> reliably, including things like block manager events (a block was added /
>>> removed on this node) and commands to tell the block manager to drop data.
>>> It would be silly to implement acks at the application level for all these.
>>> But I doubt this is the case. Prashant’s observation that the standalone
>>> cluster manager stayed up is a further sign that this might be due to GC.
>>>
>>> Matei
>>>
>>> On Oct 31, 2013, at 9:11 PM, Sriram Ramachandrasekaran <
>>> sri.rams85@gmail.com> wrote:
>>>
>>> Hi Imran,
>>> Just to add, we've noticed dis-associations in a couple projects that we
>>> built(using akka 2.2.x not spark). We went to some details to find out what
>>> was happening. As Matei, suggested, Akka keeps the TCP connection open and
>>> uses that to talk to peers. We noticed that in our case, initially, we were
>>> seeing dis-associations generally at the end of keep-alive duration. So,
>>> when the keep-alive duration ends, at the TCP layer, a keep-alive probe
>>> gets sent to inform the peer on the other side that the connection is still
>>> alive/valid. For some reason, the probe dint renew the keep-alive
>>> connection and we saw a lot of dis-associations during that time. Later, we
>>> realized this was not a pattern either. This thread<https://groups.google.com/forum/#!msg/akka-user/RYxaPl_nby4/1USHDFIRgOkJ>contains
the full history of our discussions with the Akka team. It's still
>>> open and unclear as to what was causing it for our case.
>>> We tried tweaking various settings of akka(wrt heartbeats, failure
>>> detector, even plugged-in our own failure detector with no effect).
>>>
>>> Imran - Just to clarify your point on message delivery - akka's message
>>> delivery policy is at-most-once. However, there's no guarantee for a
>>> message to be delivered to a peer. The documentation clearly explains that.
>>> http://doc.akka.io/docs/akka/2.0.2/general/message-send-semantics.html. It's
>>> the responsibility of the application developer to handle cases where
>>> message is suspected to be not have been delivered.
>>>
>>> I hope this helps.
>>>
>>>
>>>
>>>
>>> On Fri, Nov 1, 2013 at 8:35 AM, Imran Rashid <imran@quantifind.com>wrote:
>>>
>>>>
>>>> unfortunately that change wasn't the silver bullet I was hoping for.
>>>> Even with
>>>> 1) ignoring DisassociatedEvent
>>>> 2) executor uses ReliableProxy to send messages back to driver
>>>> 3) turn up akka.remote.watch-failure-detector.threshold=12
>>>>
>>>>
>>>> there is a lot of weird behavior.  First, there are a few
>>>> DisassociatedEvents, but some that are followed by AssociatedEvents, so
>>>> that seems ok.  But sometimes the re-associations are immediately followed
>>>> by this:
>>>>
>>>> 13/10/31 18:51:10 INFO executor.StandaloneExecutorBackend: got
>>>> lifecycleevent: AssociationError [akka.tcp://sparkExecutor@<executor>:41441]
>>>> -> [akka.tcp://spark@<driver>:41321]: Error [Invalid address:
>>>> akka.tcp://spark@<driver>:41321] [
>>>> akka.remote.InvalidAssociation: Invalid address:
>>>> akka.tcp://spark@<driver>:41321
>>>> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
>>>> The remote system has quarantined this system. No further associations to
>>>> the remote system are possible until this system is restarted.
>>>> ]
>>>>
>>>> On the driver, there are messages like:
>>>>
>>>> [INFO] [10/31/2013 18:51:07.838]
>>>> [spark-akka.actor.default-dispatcher-3] [Remoting] Address [
>>>> akka.tcp://sparkExecutor@<executor>:46123] is now quarantined, all
>>>> messages to this address will be delivered to dead letters.
>>>> [WARN] [10/31/2013 18:51:10.845]
>>>> [spark-akka.actor.default-dispatcher-20] [akka://spark/system/remote-watcher]
>>>> Detected unreachable: [akka.tcp://sparkExecutor@<executor>:41441]
>>>>
>>>>
>>>> and when the driver does decide that the executor has been terminated,
>>>> it removes the executor, but doesn't start another one.
>>>>
>>>> there are a ton of messages also about messages to the block manager
>>>> master ... I'm wondering if there are other parts of the system that need
>>>> to use a reliable proxy (or some sort of acknowledgement).
>>>>
>>>> I really don't think this was working properly even w/ previous
>>>> versions of spark / akka.  I'm still learning about akka, but I think you
>>>> always need an ack to be confident w/ remote communicate.  Perhaps the old
>>>> version of akka just had more robust defaults or something, but I bet it
>>>> could still have the same problems.  Even before, I have seen the driver
>>>> thinking there were running tasks, but nothing happening on any executor
--
>>>> it was just rare enough (and hard to reproduce) that I never bothered
>>>> looking into it more.
>>>>
>>>> I will keep digging ...
>>>>
>>>> On Thu, Oct 31, 2013 at 4:36 PM, Matei Zaharia <matei.zaharia@gmail.com
>>>> > wrote:
>>>>
>>>>> BTW the problem might be the Akka failure detector settings that seem
>>>>> new in 2.2: http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html
>>>>>
>>>>> Their timeouts seem pretty aggressive by default — around 10 seconds.
>>>>> This can easily be too little if you have large garbage collections.
We
>>>>> should make sure they are higher than our own node failure detection
>>>>> timeouts.
>>>>>
>>>>> Matei
>>>>>
>>>>> On Oct 31, 2013, at 1:33 PM, Imran Rashid <imran@quantifind.com>
>>>>> wrote:
>>>>>
>>>>> pretty sure I found the problem -- two problems actually.  And I think
>>>>> one of them has been a general lurking problem w/ spark for a while.
>>>>>
>>>>> 1)  we should ignore disassociation events, as you suggested earlier.
>>>>> They seem to just indicate a temporary problem, and can generally be
>>>>> ignored.  I've found that they're regularly followed by AssociatedEvents,
>>>>> and it seems communication really works fine at that point.
>>>>>
>>>>> 2) Task finished messages get lost.  When this message gets sent, we
>>>>> dont' know it actually gets there:
>>>>>
>>>>>
>>>>> https://github.com/apache/incubator-spark/blob/scala-2.10/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala#L90
>>>>>
>>>>> (this is so incredible, I feel I must be overlooking something -- but
>>>>> there is no ack somewhere else that I'm overlooking, is there??)  So,
after
>>>>> the patch, spark wasn't hanging b/c of the unhandled DisassociatedEvent.
>>>>> It hangs b/c the executor has sent some taskFinished messages that never
>>>>> get received by the driver.  So the driver is waiting for some tasks
to
>>>>> finish, but the executors think they are all done.
>>>>>
>>>>> I'm gonna add the reliable proxy pattern for this particular
>>>>> interaction and see if its fixes the problem
>>>>>
>>>>> http://doc.akka.io/docs/akka/2.2.3/contrib/reliable-proxy.html#introducing-the-reliable-proxy
>>>>>
>>>>> imran
>>>>>
>>>>>
>>>
>>>
>>> --
>>> It's just about how deep your longing is!
>>>
>>>
>>>
>>
>>
>> --
>> It's just about how deep your longing is!
>>
>>

Mime
View raw message