spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Prashant Sharma <scrapco...@gmail.com>
Subject Re: executor failures w/ scala 2.10
Date Thu, 31 Oct 2013 06:37:38 GMT
Hi Imran,

So now my jobs are running for more than 18 hours and there is not a single
executor failure, I can keep it running for more but I don't think that is
going to help. So either you have to give me a way to reproduce the issue
(Which would be great !! ) or you have to diagnose for yourself what
exactly is causing it, a disconnection usually result in Disassociation
event on which we are terminating the executor. In a normal operation this
usually means that driver has finished and those log messages are totally
fine for that matter. I am not sure why you would not see such a thing in
0.8 release and in scala 2.10, can't say anything until I can see what is
happening.



On Thu, Oct 31, 2013 at 4:41 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:

> Can you show us the sample job ? do you do sc.stop at the end or
> System.exit ? Try sc.stop too..
>
>
> On Wed, Oct 30, 2013 at 10:42 PM, Imran Rashid <imran@quantifind.com>wrote:
>
>> yeah, just causes them to hang.
>>
>> the first "deadLetters" message shows up about the same time.  Oddly,
>> after it first happens, I keep getting some results trickling in from those
>> executors.  (maybe they were just queued up on the driver already, I
>> dunno.)  but then it just hangs.  the stage has a few more tasks to be run,
>> but the executors are just idle, they're not running anything.
>>
>> I'm gonna try manually listening for more Association events listed here
>> & logging them
>> http://doc.akka.io/docs/akka/2.2.3/scala/remoting.html#remote-events
>>
>> imran
>>
>>
>>
>>
>> On Wed, Oct 30, 2013 at 11:27 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:
>>
>>> I am guessing something wrong with using Dissociation event then.
>>>
>>> Try applying something on the lines of this patch. This might cause the
>>> executors to hang so be prepared for that.
>>>
>>> diff --git
>>> a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>> b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>> index 4e8052a..1ec5d19 100644
>>> ---
>>> a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>> +++
>>> b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
>>> @@ -74,9 +74,13 @@ private[spark] class StandaloneExecutorBackend(
>>>          executor.launchTask(this, taskDesc.taskId,
>>> taskDesc.serializedTask)
>>>        }
>>>
>>> -    case DisassociatedEvent(_, _, _) =>
>>> -      logError("Driver terminated or disconnected! Shutting down.")
>>> +    case Terminated(actor) =>
>>> +      logError("Driver terminated Shutting down.")
>>>        System.exit(1)
>>> +
>>> +    // case DisassociatedEvent(_, _, _) =>
>>> +    //   logError("Driver terminated or disconnected! Shutting down.")
>>> +    //   System.exit(1)
>>>    }
>>>
>>>    override def statusUpdate(taskId: Long, state: TaskState, data:
>>> ByteBuffer) {
>>> diff --git
>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>> index b6f0ec9..9955484 100644
>>> ---
>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>> +++
>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>> @@ -102,8 +102,8 @@ class StandaloneSchedulerBackend(scheduler:
>>> ClusterScheduler, actorSystem: Actor
>>>        case Terminated(actor) =>
>>>          actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka
>>> actor terminated"))
>>>
>>> -      case DisassociatedEvent(_, remoteAddress, _) =>
>>> -
>>>  addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>> Akka client disconnected"))
>>> +      // case DisassociatedEvent(_, remoteAddress, _) =>
>>> +      //
>>> addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>> Akka client disconnected"))
>>>
>>>        case AssociationErrorEvent(_, _, remoteAddress, _) =>
>>>
>>>  addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote
>>> Akka client shutdown"))
>>> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
>>> ClusterScheduler, actorSystem: Actor
>>>      // Remove a disconnected slave from the cluster
>>>      def removeExecutor(executorId: String, reason: String) {
>>>        if (executorActor.contains(executorId)) {
>>> -        logInfo("Executor " + executorId + " disconnected, so removing
>>> it")
>>> +        logInfo("Executor " + executorId + " disconnected, so removing
>>> it, reason:" + reason)
>>>          val numCores = freeCores(executorId)
>>>          actorToExecutorId -= executorActor(executorId)
>>>          addressToExecutorId -= executorAddress(executorId)
>>>
>>>
>>>
>>> On Wed, Oct 30, 2013 at 9:42 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>
>>>> ok, so I applied a few patches
>>>>
>>>> https://github.com/quantifind/incubator-spark/pull/1/files
>>>>
>>>> and ran it again, with these options:
>>>>
>>>> -Dspark.akka.stdout-loglevel=DEBUG \
>>>>   -Dspark.akkaExtra.akka.logLevel=DEBUG\
>>>>   -Dspark.akkaExtra.akka.actor.debug.receive=on \
>>>> -Dspark.akkaExtra.akka.actor.debug.autoreceive=on \
>>>>   -Dspark.akkaExtra.akka.actor.debug.lifecycle=on \
>>>>   -Dspark.akkaExtra.akka.remote.log-sent-messages=on \
>>>>   -Dspark.akkaExtra.akka.remote.log-received-messages=on\
>>>>   -Dspark.akkaExtra.akka.log-config-on-start=on
>>>>
>>>> On the driver, I see:
>>>>
>>>> 2013-10-30 08:44:31,034 [spark-akka.actor.default-dispatcher-19] INFO
>>>> akka.actor.LocalActorRef - Message
>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>> Actor[akka://spark/deadLetters] to
>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.64%3A52400-2#-837892141]
>>>> was not delivered. [1] dead letters encountered. This logging can be turned
>>>> off or adjusted with configuration settings 'akka.log-dead-letters' and
>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>
>>>> 2013-10-30 08:44:31,058 [spark-akka.actor.default-dispatcher-13] INFO
>>>> org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor
1
>>>> disconnected, so removing it, reason:remote Akka client disconnected
>>>>
>>>> 2013-10-30 08:44:31,059 [spark-akka.actor.default-dispatcher-13] ERROR
>>>> org.apache.spark.scheduler.cluster.ClusterScheduler - Lost executor 1 on
>>>> dhd2.quantifind.com: remote Akka client disconnected
>>>>
>>>>
>>>> on the worker, stderr:
>>>>
>>>> 13/10/30 08:44:28 INFO executor.Executor: Finished task ID 934
>>>>
>>>> 13/10/30 08:44:31 ERROR executor.StandaloneExecutorBackend: Driver
>>>> terminated or disconnected! Shutting down.Disassociated [akka.tcp://
>>>> sparkExecutor@dhd2.quantifind.com:38021] -> [akka.tcp://
>>>> spark@ddd0.quantifind.com:36730]
>>>>
>>>> and unfortunately, all those akka debug options give me *no* useful
>>>> info in the worker stdout:
>>>>
>>>> Starting akka system "sparkExecutor" using config:
>>>>
>>>>       akka.daemonic = on
>>>>       akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
>>>>       akka.stdout-loglevel = "DEBUG"
>>>>       akka.actor.provider = "akka.remote.RemoteActorRefProvider"
>>>>       akka.remote.netty.tcp.transport-class =
>>>> "akka.remote.transport.netty.NettyTransport"
>>>>       akka.remote.netty.tcp.hostname = "dhd2.quantifind.com"
>>>>       akka.remote.netty.tcp.port = 0
>>>>       akka.remote.netty.tcp.connection-timeout = 60 s
>>>>       akka.remote.netty.tcp.maximum-frame-size = 10MiB
>>>>       akka.remote.netty.tcp.execution-pool-size = 4
>>>>       akka.actor.default-dispatcher.throughput = 15
>>>>       akka.remote.log-remote-lifecycle-events = off
>>>>                        akka.remote.log-sent-messages = on
>>>> akka.remote.log-received-messages = on
>>>> akka.logLevel = DEBUG
>>>> akka.actor.debug.autoreceive = on
>>>> akka.actor.debug.lifecycle = on
>>>> akka.actor.debug.receive = on
>>>> akka.log-config-on-start = on
>>>> akka.remote.quarantine-systems-for = off
>>>> [DEBUG] [10/30/2013 08:40:30.230] [main] [EventStream]
>>>> StandardOutLogger started
>>>> [DEBUG] [10/30/2013 08:40:30.438]
>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
>>>> started (akka.actor.LocalActorRefProvider$Guardian@4bf54c5f)
>>>> [DEBUG] [10/30/2013 08:40:30.446]
>>>> [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
>>>> started (akka.actor.LocalActorRefProvider$Guardian@72608760)
>>>> [DEBUG] [10/30/2013 08:40:30.447]
>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>> [akka://sparkExecutor/system] started
>>>> (akka.actor.LocalActorRefProvider$SystemGuardian@1f57ea4a)
>>>> [DEBUG] [10/30/2013 08:40:30.454]
>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
>>>> supervising Actor[akka://sparkExecutor/user]
>>>> [DEBUG] [10/30/2013 08:40:30.454]
>>>> [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
>>>> supervising Actor[akka://sparkExecutor/system]
>>>> [DEBUG] [10/30/2013 08:40:30.468]
>>>> [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
>>>> now monitoring Actor[akka://sparkExecutor/system]
>>>> [DEBUG] [10/30/2013 08:40:30.468]
>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>> [akka://sparkExecutor/system] now monitoring Actor[akka://sparkExecutor/]
>>>> [DEBUG] [10/30/2013 08:40:30.476]
>>>> [sparkExecutor-akka.actor.default-dispatcher-3]
>>>> [akka://sparkExecutor/system/log1-Slf4jLogger] started
>>>> (akka.event.slf4j.Slf4jLogger@24988707)
>>>> [DEBUG] [10/30/2013 08:40:30.477]
>>>> [sparkExecutor-akka.actor.default-dispatcher-4]
>>>> [akka://sparkExecutor/system] now supervising
>>>> Actor[akka://sparkExecutor/system/log1-Slf4jLogger#719056881]
>>>>
>>>> (followed by similar mesages for the "spark" system)
>>>>
>>>> I dunno if this means much more to you, but it seems to me that for
>>>> some reason the executor decides to disconnect from the master --
>>>> unfortunately we don't know why.  I think my logging configuration is not
>>>> getting applied correctly, or "log-sent-messages" & "log-received-messages"
>>>> don't do what I think they do ... something conflicting must be turing that
>>>> logging off.  There are a zillion different remoting settings:
>>>> http://doc.akka.io/docs/akka/snapshot/scala/remoting.html
>>>>
>>>> I feel like I really need to get those messages on why it disconnected
>>>> to know which ones to play with.  Any ideas for config changes to see those
>>>> messages?
>>>>
>>>> thanks
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Oct 30, 2013 at 10:09 AM, Prashant Sharma <scrapcodes@gmail.com
>>>> > wrote:
>>>>
>>>>> Can you apply this patch too and check the logs of Driver and worker.
>>>>>
>>>>> diff --git
>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> index b6f0ec9..ad0ebf7 100644
>>>>> ---
>>>>> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> +++
>>>>> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
>>>>> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
>>>>> ClusterScheduler, actorSystem: Actor
>>>>>      // Remove a disconnected slave from the cluster
>>>>>      def removeExecutor(executorId: String, reason: String) {
>>>>>        if (executorActor.contains(executorId)) {
>>>>> -        logInfo("Executor " + executorId + " disconnected, so
>>>>> removing it")
>>>>> +        logInfo("Executor " + executorId + " disconnected, so
>>>>> removing it, reason:" + reason)
>>>>>          val numCores = freeCores(executorId)
>>>>>          actorToExecutorId -= executorActor(executorId)
>>>>>          addressToExecutorId -= executorAddress(executorId)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Oct 30, 2013 at 8:18 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>>>
>>>>>> I just realized something about the failing stages -- they generally
>>>>>> occur in steps like this:
>>>>>>
>>>>>> rdd.mapPartitions{itr =>
>>>>>>   val myCounters = initializeSomeDataStructure()
>>>>>>   itr.foreach{
>>>>>>     //update myCounter in here
>>>>>>     ...
>>>>>>   }
>>>>>>
>>>>>>   myCounters.iterator.map{
>>>>>>     //some other transformation here ...
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> that is, as a partition is processed, nothing gets output, we just
>>>>>> accumulate some values.  Only at the end of the partition do we output
some
>>>>>> accumulated values.
>>>>>>
>>>>>> These stages don't always fail, and generally they do succeed after
>>>>>> the executor has died and a new one has started -- so I'm pretty
confident
>>>>>> its not a problem w/ the code.  But maybe we need to add something
like a
>>>>>> periodic heartbeat in this kind of operation?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <imran@quantifind.com>wrote:
>>>>>>
>>>>>>> I'm gonna try turning on more akka debugging msgs as described
at
>>>>>>> http://akka.io/faq/
>>>>>>> and
>>>>>>>
>>>>>>> http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations
>>>>>>>
>>>>>>> unfortunately that will require a patch to spark, but hopefully
that
>>>>>>> will give us more info to go on ...
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma <
>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>
>>>>>>>> I have things running (from scala 2.10 branch) for over 3-4
hours
>>>>>>>> now without a problem and my jobs write data about the same
as you
>>>>>>>> suggested. My cluster size is 7 nodes and not *congested*
for memory. I
>>>>>>>> going to leave jobs running all night long. Meanwhile I had
encourage you
>>>>>>>> to try to spot the problem such that it is reproducible that
can help a ton
>>>>>>>> in fixing the issue.
>>>>>>>>
>>>>>>>> Thanks for testing and reporting your experience. I still
feel
>>>>>>>> there is something else wrong !. About tolerance for network
connection
>>>>>>>> timeouts, setting those properties should work, but I am
afraid about
>>>>>>>> Disassociation Event though. I will have to check this is
indeed hard to
>>>>>>>> reproduce bug if it is, I mean how do I simulate network
delays ?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <imran@quantifind.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> This is a spark-standalone setup (not mesos), on our
own cluster.
>>>>>>>>>
>>>>>>>>> At first I thought it must be some temporary network
problem too
>>>>>>>>> -- but the times between receiving task completion events
from an executor
>>>>>>>>> and declaring it failed are really small, so I didn't
think that could
>>>>>>>>> possibly be it.  Plus we tried increasing various akka
timeouts, but that
>>>>>>>>> didn't help.  Or maybe there are some other spark / akka
properities we
>>>>>>>>> should be setting?  It certainly should be resilient
to such a temporary
>>>>>>>>> network issue, if that is the problem.
>>>>>>>>>
>>>>>>>>> btw, I think I've noticed this happens most often during
>>>>>>>>> ShuffleMapTasks.  The tasks write out very small amounts
of data (64 MB
>>>>>>>>> total for the entire stage).
>>>>>>>>>
>>>>>>>>> thanks
>>>>>>>>>
>>>>>>>>> On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma <
>>>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Are you using mesos ? I admit to have not properly
tested things
>>>>>>>>>> on mesos though.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 30, 2013 at 11:31 AM, Prashant Sharma
<
>>>>>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Those log messages are new to the Akka 2.2 and
are usually seen
>>>>>>>>>>> when a node is disassociated with other by either
a network failure or even
>>>>>>>>>>> clean shutdown. This suggests some network issue
to me, are you running on
>>>>>>>>>>> EC2 ? It might be a temporary thing in that case.
>>>>>>>>>>>
>>>>>>>>>>> I had like to have more details on the long jobs
though, how
>>>>>>>>>>> long ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid
<
>>>>>>>>>>> imran@quantifind.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> We've been testing out the 2.10 branch of
spark, and we're
>>>>>>>>>>>> running into some issues were akka disconnects
from the executors after a
>>>>>>>>>>>> while.  We ran some simple tests first, and
all was well, so we started
>>>>>>>>>>>> upgrading our whole codebase to 2.10.  Everything
seemed to be working, but
>>>>>>>>>>>> then we noticed that when we run long jobs,
and then things start failing.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The first suspicious thing is that we get
akka warnings about
>>>>>>>>>>>> undeliverable messages sent to deadLetters:
>>>>>>>>>>>>
>>>>>>>>>>>> 22013-10-29 11:03:54,577
>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO  akka.actor.LocalActorRef -
>>>>>>>>>>>> Message
>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>>>>>> was not delivered. [4] dead letters encountered.
This logging can be turned
>>>>>>>>>>>> off or adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>> 2013-10-29 11:03:54,579
>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-19]
INFO  akka.actor.LocalActorRef -
>>>>>>>>>>>> Message [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>>>>>> was not delivered. [5] dead letters encountered.
This logging can be turned
>>>>>>>>>>>> off or adjusted with configuration settings
'akka.log-dead-letters' and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Generally within a few seconds after the
first such message,
>>>>>>>>>>>> there are a bunch more, and then the executor
is marked as failed, and a
>>>>>>>>>>>> new one is started:
>>>>>>>>>>>>
>>>>>>>>>>>> 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3]
>>>>>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
>>>>>>>>>>>> 40dhd2.quantifind.com%3A45794-6#-890135716]
was not delivered.
>>>>>>>>>>>> [10] dead letters encountered, no more dead
letters will be logged. This
>>>>>>>>>>>> logging can be turned off or adjusted with
configuration settings
>>>>>>>>>>>> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>> 2013-10-29 11:03:58,778
>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO
>>>>>>>>>>>> org.apache.spark.deploy.client.Client$ClientActor
- Executor updated:
>>>>>>>>>>>> app-20131029110000-0000/1 is now FAILED (Command
exited with code 1)
>>>>>>>>>>>>
>>>>>>>>>>>> 2013-10-29 11:03:58,784
>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-17]
INFO
>>>>>>>>>>>> org.apache.spark.deploy.client.Client$ClientActor
- Executor added:
>>>>>>>>>>>> app-20131029110000-0000/2 on
>>>>>>>>>>>> worker-20131029105824-dhd2.quantifind.com-51544
(
>>>>>>>>>>>> dhd2.quantifind.com:51544) with 24 cores
>>>>>>>>>>>>
>>>>>>>>>>>> 2013-10-29 11:03:58,784
>>>>>>>>>>>> [spark-akka.actor.default-dispatcher-18]
ERROR akka.remote.EndpointWriter -
>>>>>>>>>>>> AssociationError [akka.tcp://spark@ddd0.quantifind.com:43068]
>>>>>>>>>>>> -> [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]:
Error
>>>>>>>>>>>> [Association failed with [akka.tcp://
>>>>>>>>>>>> sparkExecutor@dhd2.quantifind.com:45794]]
[
>>>>>>>>>>>> akka.remote.EndpointAssociationException:
Association failed
>>>>>>>>>>>> with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]
>>>>>>>>>>>> Caused by:
>>>>>>>>>>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>>>>>>>>>>> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Looking in the logs of the failed executor,
there are some
>>>>>>>>>>>> similar messages about undeliverable messages,
but I don't see any reason:
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:52 INFO executor.Executor:
Finished task ID 943
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>> [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not delivered.
>>>>>>>>>>>> [1] dead letters encountered. This logging
can be turned off or adjusted
>>>>>>>>>>>> with configuration settings 'akka.log-dead-letters'
and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not delivered.
>>>>>>>>>>>> [2] dead letters encountered. This logging
can be turned off or adjusted
>>>>>>>>>>>> with configuration settings 'akka.log-dead-letters'
and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated]
from
>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not delivered.
>>>>>>>>>>>> [3] dead letters encountered. This logging
can be turned off or adjusted
>>>>>>>>>>>> with configuration settings 'akka.log-dead-letters'
and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend:
>>>>>>>>>>>> Driver terminated or disconnected! Shutting
down.
>>>>>>>>>>>>
>>>>>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef:
Message
>>>>>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548]
was not delivered.
>>>>>>>>>>>> [4] dead letters encountered. This logging
can be turned off or adjusted
>>>>>>>>>>>> with configuration settings 'akka.log-dead-letters'
and
>>>>>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After this happens, spark does launch a new
executor
>>>>>>>>>>>> successfully, and continue the job.  Sometimes,
the job just continues
>>>>>>>>>>>> happily and there aren't any other problems.
 However, that executor may
>>>>>>>>>>>> have to run a bunch of steps to re-compute
some cached RDDs -- and during
>>>>>>>>>>>> that time, another executor may crash similarly,
and then we end up in a
>>>>>>>>>>>> never ending loop, of one executor crashing,
then trying to reload data,
>>>>>>>>>>>> while the others sit around.
>>>>>>>>>>>>
>>>>>>>>>>>> I have no idea what is triggering this behavior
-- there isn't
>>>>>>>>>>>> any particular point in the job that it regularly
occurs at.  Certain steps
>>>>>>>>>>>> seem more prone to this, but there isn't
any step which regularly causes
>>>>>>>>>>>> the problem.  In a long pipeline of steps,
though, that loop becomes very
>>>>>>>>>>>> likely.  I don't think its a timeout issue
-- the initial failing executors
>>>>>>>>>>>> can be actively completing stages just seconds
before this failure
>>>>>>>>>>>> happens.  We did try adjusting some of the
spark / akka timeouts:
>>>>>>>>>>>>
>>>>>>>>>>>>     -Dspark.storage.blockManagerHeartBeatMs=300000
>>>>>>>>>>>>     -Dspark.akka.frameSize=150
>>>>>>>>>>>>     -Dspark.akka.timeout=120
>>>>>>>>>>>>     -Dspark.akka.askTimeout=30
>>>>>>>>>>>>     -Dspark.akka.logLifecycleEvents=true
>>>>>>>>>>>>
>>>>>>>>>>>> but those settings didn't seem to help the
problem at all.  I
>>>>>>>>>>>> figure it must be some configuration with
the new version of akka that
>>>>>>>>>>>> we're missing, but we haven't found anything.
 Any ideas?
>>>>>>>>>>>>
>>>>>>>>>>>> our code works fine w/ the 0.8.0 release
on scala 2.9.3.  The
>>>>>>>>>>>> failures occur on the tip of the scala-2.10
branch (5429d62d)
>>>>>>>>>>>>
>>>>>>>>>>>> thanks,
>>>>>>>>>>>> Imran
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> s
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> s
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> s
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> s
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> s
>>>
>>
>>
>
>
> --
> s
>



-- 
s

Mime
View raw message