spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Prashant Sharma <scrapco...@gmail.com>
Subject Re: executor failures w/ scala 2.10
Date Fri, 01 Nov 2013 03:54:30 GMT
Hi Imran,

Thanks for your patience and testing, see inline.

On Thu, Oct 31, 2013 at 11:47 PM, Imran Rashid <imran@quantifind.com> wrote:

> Hi Prashant,
>
> thanks for looking into this.  I don't have any answers yet, but just
> wanted to send you an update.  I finally figured out how to get all the
> akka logging turned on, so I'm looking at those for more info.  One thing
> immediately jumped out at me -- the Disassociation is actually immediatley
> followed by an Association!  so maybe I came to the wrong conclusion of our
> test of ignoring the DisassociatedEvent.
>
Before we conclude something about reliable messaging, I want you to for
once consider other possibilities like actual network reconnection and may
be a GC pause ? Try connecting something like jconsole (or alike ) and see
what happens on the driver and executor.

My doubt are since we are using standalone mode where even master and
worker are also actors then if we see a weird behaviour on the executor and
driver then Why not on master and worker too ? They should also break away
from each other. For this reason, I am doubting our conclusions and may be
if we narrow down the problem first before we conclude something. It is a
regression in akka 2.2.3 it uses more memory than it used to be in 2.1.x.
See https://github.com/akka/akka/issues/1810


I'm going to try it again -- hopefully w/ the logging on, I can find out
> more about what is going on.  I might ask on akka list for help w/ what to
> look for.  also this thread makes me think that it really should just
> re-associate:
>
> https://groups.google.com/forum/#!searchin/akka-user/Disassociated/akka-user/SajwwbyTriQ/8oxjbZtawxoJ
>
> also, I've noticed that actually these failures are *not* more likely the
> longer the job runs (or not that much, anyway).  when I reorder our jobs, I
> can get the failures pretty regularly within 10 minutes.  It seems to be
> more of a function of what the jobs do.  I'll also see if I can produce a
> simple example which exhibits the same problem.
>
> Like pointed on that thread one possibility is that we are hogging too
many threads but I guess, the systems you are using are really powerful but
then try leaving 1 or more CPU free on each worker nodes. (Ofcourse this
not my suggested solution, but a way to narrow down that hogging threads
might be an issue.)

I have got some hints now will think more.

thanks!
>
> Imran
>
>
>
> On Thu, Oct 31, 2013 at 1:37 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:
>
>> Hi Imran,
>>
>> So now my jobs are running for more than 18 hours and there is not a
>> single executor failure, I can keep it running for more but I don't think
>> that is going to help. So either you have to give me a way to reproduce the
>> issue (Which would be great !! ) or you have to diagnose for yourself what
>> exactly is causing it, a disconnection usually result in Disassociation
>> event on which we are terminating the executor. In a normal operation this
>> usually means that driver has finished and those log messages are totally
>> fine for that matter. I am not sure why you would not see such a thing in
>> 0.8 release and in scala 2.10, can't say anything until I can see what is
>> happening.
>>
>>
>>
>> On Thu, Oct 31, 2013 at 4:41 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:
>>
>>> Can you show us the sample job ? do you do sc.stop at the end or
>>> System.exit ? Try sc.stop too..
>>>
>>>
>>> On Wed, Oct 30, 2013 at 10:42 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>
>>>> yeah, just causes them to hang.
>>>>
>>>> the first "deadLetters" message shows up about the same time.  Oddly,
>>>> after it first happens, I keep getting some results trickling in from those
>>>> executors.  (maybe they were just queued up on the driver already, I
>>>> dunno.)  but then it just hangs.  the stage has a few more tasks to be run,
>>>> but the executors are just idle, they're not running anything.
>>>>
>>>> I'm gonna try manually listening for more Association events listed
>>>> here & logging them
>>>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#remote-events
>>>>
>>>> imran
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Oct 30, 2013 at 11:27 AM, Prashant Sharma <scrapcodes@gmail.com
>>>> > wrote:
>>>>
>>>>> I am guessing something wrong with using Dissociation event then.
>>>>>
>>>>> Try applying something on the lines of this patch. This might cause
>>>>> the executors to hang so be prepared for that.
>>>>>
>>>>> diff --git
>>>>> a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>>>> b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>>>> index 4e8052a..1ec5d19 100644
>>>>> ---
>>>>> a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>>>> +++
>>>>> b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>>>> @@ -74,9 +74,13 @@ private[spark] class StandaloneExecutorBackend(
>>>>>          executor.launchTask(this, taskDesc.taskId,
>>>>> taskDesc.serializedTask)
>>>>>        }
>>>>>
>>>>> -    case DisassociatedEvent(_, _, _) =>
>>>>> -      logError("Driver terminated or disconnected! Shutting down.")
>>>>> +    case Terminated(actor) =>
>>>>> +      logError("Driver terminated Shutting down.")
>>>>>        System.exit(1)
>>>>> +
>>>>> +    // case DisassociatedEvent(_, _, _) =>
>>>>> +    //   logError("Driver terminated or disconnected! Shutting down.")
>>>>> +    //   System.exit(1)
>>>>>    }
>>>>>
>>>>>    override def statusUpdate(taskId: Long, state: TaskState, data:
>>>>> ByteBuffer) {
>>>>> diff --git
>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> index b6f0ec9..9955484 100644
>>>>> ---
>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> +++
>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> @@ -102,8 +102,8 @@ class StandaloneSchedulerBackend(scheduler:
>>>>> ClusterScheduler, actorSystem: Actor
>>>>>        case Terminated(actor) =>
>>>>>          actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka
>>>>> actor terminated"))
>>>>>
>>>>> -      case DisassociatedEvent(_, remoteAddress, _) =>
>>>>> -
>>>>>  addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>>>> Akka client disconnected"))
>>>>> +      // case DisassociatedEvent(_, remoteAddress, _) =>
>>>>> +      //
>>>>> addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>>>> Akka client disconnected"))
>>>>>
>>>>>        case AssociationErrorEvent(_, _, remoteAddress, _) =>
>>>>>
>>>>>  addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>>>> Akka client shutdown"))
>>>>> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
>>>>> ClusterScheduler, actorSystem: Actor
>>>>>      // Remove a disconnected slave from the cluster
>>>>>      def removeExecutor(executorId: String, reason: String) {
>>>>>        if (executorActor.contains(executorId)) {
>>>>> -        logInfo("Executor " + executorId + " disconnected, so
>>>>> removing it")
>>>>> +        logInfo("Executor " + executorId + " disconnected, so
>>>>> removing it, reason:" + reason)
>>>>>          val numCores = freeCores(executorId)
>>>>>          actorToExecutorId -= executorActor(executorId)
>>>>>          addressToExecutorId -= executorAddress(executorId)
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Oct 30, 2013 at 9:42 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>>>
>>>>>> ok, so I applied a few patches
>>>>>>
>>>>>> https://github.com/quantifind/incubator-spark/pull/1/files
>>>>>>
>>>>>> and ran it again, with these options:
>>>>>>
>>>>>> -Dspark.akka.stdout-loglevel=DEBUG \
>>>>>>   -Dspark.akkaExtra.akka.logLevel=DEBUG\
>>>>>>   -Dspark.akkaExtra.akka.actor.debug.receive=on \
>>>>>> -Dspark.akkaExtra.akka.actor.debug.autoreceive=on \
>>>>>>   -Dspark.akkaExtra.akka.actor.debug.lifecycle=on \
>>>>>>   -Dspark.akkaExtra.akka.remote.log-sent-messages=on \
>>>>>>   -Dspark.akkaExtra.akka.remote.log-received-messages=on\
>>>>>>   -Dspark.akkaExtra.akka.log-config-on-start=on
>>>>>>
>>>>>> On the driver, I see:
>>>>>>
>>>>>> 2013-10-30 08:44:31,034 [spark-akka.actor.default-dispatcher-19]
>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>> Actor[akka://spark/deadLetters] to
>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.64%3A52400-2#-837892141]
>>>>>> was not delivered. [1] dead letters encountered. This logging can
be turned
>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters'
and
>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>
>>>>>> 2013-10-30 08:44:31,058 [spark-akka.actor.default-dispatcher-13]
>>>>>> INFO  org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
-
>>>>>> Executor 1 disconnected, so removing it, reason:remote Akka client
>>>>>> disconnected
>>>>>>
>>>>>> 2013-10-30 08:44:31,059 [spark-akka.actor.default-dispatcher-13]
>>>>>> ERROR org.apache.spark.scheduler.cluster.ClusterScheduler - Lost
executor 1
>>>>>> on dhd2.quantifind.com: remote Akka client disconnected
>>>>>>
>>>>>>
>>>>>> on the worker, stderr:
>>>>>>
>>>>>> 13/10/30 08:44:28 INFO executor.Executor: Finished task ID 934
>>>>>>
>>>>>> 13/10/30 08:44:31 ERROR executor.StandaloneExecutorBackend: Driver
>>>>>> terminated or disconnected! Shutting down.Disassociated [akka.tcp://
>>>>>> sparkExecutor@dhd2.quantifind.com:38021] -> [akka.tcp://
>>>>>> spark@ddd0.quantifind.com:36730]
>>>>>>
>>>>>> and unfortunately, all those akka debug options give me *no* useful
>>>>>> info in the worker stdout:
>>>>>>
>>>>>> Starting akka system "sparkExecutor" using config:
>>>>>>
>>>>>>       akka.daemonic = on
>>>>>>       akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
>>>>>>       akka.stdout-loglevel = "DEBUG"
>>>>>>       akka.actor.provider = "akka.remote.RemoteActorRefProvider"
>>>>>>       akka.remote.netty.tcp.transport-class =
>>>>>> "akka.remote.transport.netty.NettyTransport"
>>>>>>       akka.remote.netty.tcp.hostname = "dhd2.quantifind.com"
>>>>>>       akka.remote.netty.tcp.port = 0
>>>>>>       akka.remote.netty.tcp.connection-timeout = 60 s
>>>>>>       akka.remote.netty.tcp.maximum-frame-size = 10MiB
>>>>>>       akka.remote.netty.tcp.execution-pool-size = 4
>>>>>>       akka.actor.default-dispatcher.throughput = 15
>>>>>>       akka.remote.log-remote-lifecycle-events = off
>>>>>>                        akka.remote.log-sent-messages = on
>>>>>> akka.remote.log-received-messages = on
>>>>>> akka.logLevel = DEBUG
>>>>>> akka.actor.debug.autoreceive = on
>>>>>> akka.actor.debug.lifecycle = on
>>>>>> akka.actor.debug.receive = on
>>>>>> akka.log-config-on-start = on
>>>>>> akka.remote.quarantine-systems-for = off
>>>>>> [DEBUG] [10/30/2013 08:40:30.230] [main] [EventStream]
>>>>>> StandardOutLogger started
>>>>>> [DEBUG] [10/30/2013 08:40:30.438]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
>>>>>> started (akka.actor.LocalActorRefProvider$Guardian@4bf54c5f)
>>>>>> [DEBUG] [10/30/2013 08:40:30.446]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
>>>>>> started (akka.actor.LocalActorRefProvider$Guardian@72608760)
>>>>>> [DEBUG] [10/30/2013 08:40:30.447]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>>>> [akka://sparkExecutor/system] started
>>>>>> (akka.actor.LocalActorRefProvider$SystemGuardian@1f57ea4a)
>>>>>> [DEBUG] [10/30/2013 08:40:30.454]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
now
>>>>>> supervising Actor[akka://sparkExecutor/user]
>>>>>> [DEBUG] [10/30/2013 08:40:30.454]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
now
>>>>>> supervising Actor[akka://sparkExecutor/system]
>>>>>> [DEBUG] [10/30/2013 08:40:30.468]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
>>>>>> now monitoring Actor[akka://sparkExecutor/system]
>>>>>> [DEBUG] [10/30/2013 08:40:30.468]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>>>> [akka://sparkExecutor/system] now monitoring Actor[akka://sparkExecutor/]
>>>>>> [DEBUG] [10/30/2013 08:40:30.476]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-3]
>>>>>> [akka://sparkExecutor/system/log1-Slf4jLogger] started
>>>>>> (akka.event.slf4j.Slf4jLogger@24988707)
>>>>>> [DEBUG] [10/30/2013 08:40:30.477]
>>>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>>>> [akka://sparkExecutor/system] now supervising
>>>>>> Actor[akka://sparkExecutor/system/log1-Slf4jLogger#719056881]
>>>>>>
>>>>>> (followed by similar mesages for the "spark" system)
>>>>>>
>>>>>> I dunno if this means much more to you, but it seems to me that for
>>>>>> some reason the executor decides to disconnect from the master --
>>>>>> unfortunately we don't know why.  I think my logging configuration
is not
>>>>>> getting applied correctly, or "log-sent-messages" & "log-received-messages"
>>>>>> don't do what I think they do ... something conflicting must be turing
that
>>>>>> logging off.  There are a zillion different remoting settings:
>>>>>> http://doc.akka.io/docs/akka/snapshot/scala/remoting.html
>>>>>>
>>>>>> I feel like I really need to get those messages on why it
>>>>>> disconnected to know which ones to play with.  Any ideas for config
changes
>>>>>> to see those messages?
>>>>>>
>>>>>> thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 30, 2013 at 10:09 AM, Prashant Sharma <
>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>
>>>>>>> Can you apply this patch too and check the logs of Driver and
worker.
>>>>>>>
>>>>>>> diff --git
>>>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>>>> index b6f0ec9..ad0ebf7 100644
>>>>>>> ---
>>>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>>>> +++
>>>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>>>> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
>>>>>>> ClusterScheduler, actorSystem: Actor
>>>>>>>      // Remove a disconnected slave from the cluster
>>>>>>>      def removeExecutor(executorId: String, reason: String) {
>>>>>>>        if (executorActor.contains(executorId)) {
>>>>>>> -        logInfo("Executor " + executorId + " disconnected, so
>>>>>>> removing it")
>>>>>>> +        logInfo("Executor " + executorId + " disconnected, so
>>>>>>> removing it, reason:" + reason)
>>>>>>>          val numCores = freeCores(executorId)
>>>>>>>          actorToExecutorId -= executorActor(executorId)
>>>>>>>          addressToExecutorId -= executorAddress(executorId)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 30, 2013 at 8:18 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>>>>>
>>>>>>>> I just realized something about the failing stages -- they
>>>>>>>> generally occur in steps like this:
>>>>>>>>
>>>>>>>> rdd.mapPartitions{itr =>
>>>>>>>>   val myCounters = initializeSomeDataStructure()
>>>>>>>>   itr.foreach{
>>>>>>>>     //update myCounter in here
>>>>>>>>     ...
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   myCounters.iterator.map{
>>>>>>>>     //some other transformation here ...
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> that is, as a partition is processed, nothing gets output,
we just
>>>>>>>> accumulate some values.  Only at the end of the partition
do we output some
>>>>>>>> accumulated values.
>>>>>>>>
>>>>>>>> These stages don't always fail, and generally they do succeed
after
>>>>>>>> the executor has died and a new one has started -- so I'm
pretty confident
>>>>>>>> its not a problem w/ the code.  But maybe we need to add
something like a
>>>>>>>> periodic heartbeat in this kind of operation?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <imran@quantifind.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I'm gonna try turning on more akka debugging msgs as
described at
>>>>>>>>> http://akka.io/faq/
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>> http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations
>>>>>>>>>
>>>>>>>>> unfortunately that will require a patch to spark, but
hopefully
>>>>>>>>> that will give us more info to go on ...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma <
>>>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have things running (from scala 2.10 branch) for
over 3-4 hours
>>>>>>>>>> now without a problem and my jobs write data about
the same as you
>>>>>>>>>> suggested. My cluster size is 7 nodes and not *congested*
for memory. I
>>>>>>>>>> going to leave jobs running all night long. Meanwhile
I had encourage you
>>>>>>>>>> to try to spot the problem such that it is reproducible
that can help a ton
>>>>>>>>>> in fixing the issue.
>>>>>>>>>>
>>>>>>>>>> Thanks for testing and reporting your experience.
I still feel
>>>>>>>>>> there is something else wrong !. About tolerance
for network connection
>>>>>>>>>> timeouts, setting those properties should work, but
I am afraid about
>>>>>>>>>> Disassociation Event though. I will have to check
this is indeed hard to
>>>>>>>>>> reproduce bug if it is, I mean how do I simulate
network delays ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <
>>>>>>>>>> imran@quantifind.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is a spark-standalone setup (not mesos),
on our own cluster.
>>>>>>>>>>>
>>>>>>>>>>> At first I thought it must be some temporary
network problem too
>>>>>>>>>>> -- but the times between receiving task completion
events from an executor
>>>>>>>>>>> and declaring it failed are really small, so
I didn't think that could
>>>>>>>>>>> possibly be it.  Plus we tried increasing various
akka timeouts, but that
>>>>>>>>>>> didn't help.  Or maybe there are some other spark
/ akka properities we
>>>>>>>>>>> should be setting?  It certainly should be resilient
to such a temporary
>>>>>>>>>>> network issue, if that is the problem.
>>>>>>>>>>>
>>>>>>>>>>> btw, I think I've noticed this happens most often
during
>>>>>>>>>>> ShuffleMapTasks.  The tasks write out very small
amounts of data (64 MB
>>>>>>>>>>> total for the entire stage).
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma
<
>>>>>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you using mesos ? I admit to have not
properly tested
>>>>>>>>>>>> things on mesos though.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 30, 2013 at 11:31 AM, Prashant
Sharma <
>>>>>>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Those log messages are new to the Akka
2.2 and are usually
>>>>>>>>>>>>> seen when a node is disassociated with
other by either a network failure or
>>>>>>>>>>>>> even clean shutdown. This suggests some
network issue to me, are you
>>>>>>>>>>>>> running on EC2 ? It might be a temporary
thing in that case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I had like to have more details on the
long jobs though, how
>>>>>>>>>>>>> long ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 30, 2013 at 1:29 AM, Imran
Rashid <
>>>>>>>>>>>>> imran@quantifind.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> We've been testing out the 2.10 branch
of spark, and we're
>>>>>>>>>>>>>> running into some issues were akka
disconnects from the executors after a
>>>>>>>>>>>>>> while.  We ran some simple tests
first, and all was well, so we started
>>>>>>>>>>>>>> upgrading our whole codebase to 2.10.
 Everything seemed to be working, but
>>>>>>>>>>>>>> then we noticed that when we run
long jobs, and then things start failing.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The first suspicious thing is that
we get akka warnings about
>>>>>>>>>>>>>> undeliverable messages sent to deadLetters:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 22013-10-29 11:03:54,577
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO  akka.actor.LocalActorRef -
>>>>>>>>>>>>>> Message
>>>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>>>>>>>> was not delivered. [4] dead letters
encountered. This logging can be turned
>>>>>>>>>>>>>> off or adjusted with configuration
settings 'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013-10-29 11:03:54,579
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-19]
INFO  akka.actor.LocalActorRef -
>>>>>>>>>>>>>> Message [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>>>>>>>> was not delivered. [5] dead letters
encountered. This logging can be turned
>>>>>>>>>>>>>> off or adjusted with configuration
settings 'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Generally within a few seconds after
the first such message,
>>>>>>>>>>>>>> there are a bunch more, and then
the executor is marked as failed, and a
>>>>>>>>>>>>>> new one is started:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013-10-29 11:03:58,775
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-3]
INFO  akka.actor.LocalActorRef -
>>>>>>>>>>>>>> Message
>>>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
>>>>>>>>>>>>>> 40dhd2.quantifind.com%3A45794-6#-890135716]
was not
>>>>>>>>>>>>>> delivered. [10] dead letters encountered,
no more dead letters will be
>>>>>>>>>>>>>> logged. This logging can be turned
off or adjusted with configuration
>>>>>>>>>>>>>> settings 'akka.log-dead-letters'
and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013-10-29 11:03:58,778
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO
>>>>>>>>>>>>>> org.apache.spark.deploy.client.Client$ClientActor
- Executor updated:
>>>>>>>>>>>>>> app-20131029110000-0000/1 is now
FAILED (Command exited with code 1)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013-10-29 11:03:58,784
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO
>>>>>>>>>>>>>> org.apache.spark.deploy.client.Client$ClientActor
- Executor added:
>>>>>>>>>>>>>> app-20131029110000-0000/2 on
>>>>>>>>>>>>>> worker-20131029105824-dhd2.quantifind.com-51544
(
>>>>>>>>>>>>>> dhd2.quantifind.com:51544) with 24
cores
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2013-10-29 11:03:58,784
>>>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-18]
ERROR akka.remote.EndpointWriter -
>>>>>>>>>>>>>> AssociationError [akka.tcp://spark@ddd0.quantifind.com:43068]
>>>>>>>>>>>>>> -> [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]:
>>>>>>>>>>>>>> Error [Association failed with [akka.tcp://
>>>>>>>>>>>>>> sparkExecutor@dhd2.quantifind.com:45794]]
[
>>>>>>>>>>>>>> akka.remote.EndpointAssociationException:
Association failed
>>>>>>>>>>>>>> with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]
>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>>>>>>>>>>>>> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looking in the logs of the failed
executor, there are some
>>>>>>>>>>>>>> similar messages about undeliverable
messages, but I don't see any reason:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:52 INFO executor.Executor:
Finished task ID 943
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>>>> [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not
>>>>>>>>>>>>>> delivered. [1] dead letters encountered.
This logging can be turned off or
>>>>>>>>>>>>>> adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not
>>>>>>>>>>>>>> delivered. [2] dead letters encountered.
This logging can be turned off or
>>>>>>>>>>>>>> adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not
>>>>>>>>>>>>>> delivered. [3] dead letters encountered.
This logging can be turned off or
>>>>>>>>>>>>>> adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend:
>>>>>>>>>>>>>> Driver terminated or disconnected!
Shutting down.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not
>>>>>>>>>>>>>> delivered. [4] dead letters encountered.
This logging can be turned off or
>>>>>>>>>>>>>> adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> After this happens, spark does launch
a new executor
>>>>>>>>>>>>>> successfully, and continue the job.
 Sometimes, the job just continues
>>>>>>>>>>>>>> happily and there aren't any other
problems.  However, that executor may
>>>>>>>>>>>>>> have to run a bunch of steps to re-compute
some cached RDDs -- and during
>>>>>>>>>>>>>> that time, another executor may crash
similarly, and then we end up in a
>>>>>>>>>>>>>> never ending loop, of one executor
crashing, then trying to reload data,
>>>>>>>>>>>>>> while the others sit around.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have no idea what is triggering
this behavior -- there
>>>>>>>>>>>>>> isn't any particular point in the
job that it regularly occurs at.  Certain
>>>>>>>>>>>>>> steps seem more prone to this, but
there isn't any step which regularly
>>>>>>>>>>>>>> causes the problem.  In a long pipeline
of steps, though, that loop becomes
>>>>>>>>>>>>>> very likely.  I don't think its a
timeout issue -- the initial failing
>>>>>>>>>>>>>> executors can be actively completing
stages just seconds before this
>>>>>>>>>>>>>> failure happens.  We did try adjusting
some of the spark / akka timeouts:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     -Dspark.storage.blockManagerHeartBeatMs=300000
>>>>>>>>>>>>>>     -Dspark.akka.frameSize=150
>>>>>>>>>>>>>>     -Dspark.akka.timeout=120
>>>>>>>>>>>>>>     -Dspark.akka.askTimeout=30
>>>>>>>>>>>>>>     -Dspark.akka.logLifecycleEvents=true
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> but those settings didn't seem to
help the problem at all.  I
>>>>>>>>>>>>>> figure it must be some configuration
with the new version of akka that
>>>>>>>>>>>>>> we're missing, but we haven't found
anything.  Any ideas?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> our code works fine w/ the 0.8.0
release on scala 2.9.3.  The
>>>>>>>>>>>>>> failures occur on the tip of the
scala-2.10 branch (5429d62d)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thanks,
>>>>>>>>>>>>>> Imran
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> s
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> s
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> s
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> s
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> s
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> s
>>>
>>
>>
>>
>> --
>> s
>>
>
>


-- 
s

Mime
View raw message