spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <im...@quantifind.com>
Subject Re: executor failures w/ scala 2.10
Date Wed, 30 Oct 2013 16:12:06 GMT
ok, so I applied a few patches

https://github.com/quantifind/incubator-spark/pull/1/files

and ran it again, with these options:

-Dspark.akka.stdout-loglevel=DEBUG \
  -Dspark.akkaExtra.akka.logLevel=DEBUG\
  -Dspark.akkaExtra.akka.actor.debug.receive=on \
-Dspark.akkaExtra.akka.actor.debug.autoreceive=on \
  -Dspark.akkaExtra.akka.actor.debug.lifecycle=on \
  -Dspark.akkaExtra.akka.remote.log-sent-messages=on \
  -Dspark.akkaExtra.akka.remote.log-received-messages=on\
  -Dspark.akkaExtra.akka.log-config-on-start=on

On the driver, I see:

2013-10-30 08:44:31,034 [spark-akka.actor.default-dispatcher-19] INFO
akka.actor.LocalActorRef - Message
[akka.remote.transport.AssociationHandle$Disassociated] from
Actor[akka://spark/deadLetters] to
Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.64%3A52400-2#-837892141]
was not delivered. [1] dead letters encountered. This logging can be turned
off or adjusted with configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.

2013-10-30 08:44:31,058 [spark-akka.actor.default-dispatcher-13] INFO
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor 1
disconnected, so removing it, reason:remote Akka client disconnected

2013-10-30 08:44:31,059 [spark-akka.actor.default-dispatcher-13] ERROR
org.apache.spark.scheduler.cluster.ClusterScheduler - Lost executor 1 on
dhd2.quantifind.com: remote Akka client disconnected


on the worker, stderr:

13/10/30 08:44:28 INFO executor.Executor: Finished task ID 934

13/10/30 08:44:31 ERROR executor.StandaloneExecutorBackend: Driver
terminated or disconnected! Shutting down.Disassociated [akka.tcp://
sparkExecutor@dhd2.quantifind.com:38021] -> [akka.tcp://
spark@ddd0.quantifind.com:36730]

and unfortunately, all those akka debug options give me *no* useful info in
the worker stdout:

Starting akka system "sparkExecutor" using config:

      akka.daemonic = on
      akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      akka.stdout-loglevel = "DEBUG"
      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      akka.remote.netty.tcp.transport-class =
"akka.remote.transport.netty.NettyTransport"
      akka.remote.netty.tcp.hostname = "dhd2.quantifind.com"
      akka.remote.netty.tcp.port = 0
      akka.remote.netty.tcp.connection-timeout = 60 s
      akka.remote.netty.tcp.maximum-frame-size = 10MiB
      akka.remote.netty.tcp.execution-pool-size = 4
      akka.actor.default-dispatcher.throughput = 15
      akka.remote.log-remote-lifecycle-events = off
                       akka.remote.log-sent-messages = on
akka.remote.log-received-messages = on
akka.logLevel = DEBUG
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.log-config-on-start = on
akka.remote.quarantine-systems-for = off
[DEBUG] [10/30/2013 08:40:30.230] [main] [EventStream] StandardOutLogger
started
[DEBUG] [10/30/2013 08:40:30.438]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/]
started (akka.actor.LocalActorRefProvider$Guardian@4bf54c5f)
[DEBUG] [10/30/2013 08:40:30.446]
[sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
started (akka.actor.LocalActorRefProvider$Guardian@72608760)
[DEBUG] [10/30/2013 08:40:30.447]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] started
(akka.actor.LocalActorRefProvider$SystemGuardian@1f57ea4a)
[DEBUG] [10/30/2013 08:40:30.454]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
supervising Actor[akka://sparkExecutor/user]
[DEBUG] [10/30/2013 08:40:30.454]
[sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now
supervising Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468]
[sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user]
now monitoring Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] now monitoring Actor[akka://sparkExecutor/]
[DEBUG] [10/30/2013 08:40:30.476]
[sparkExecutor-akka.actor.default-dispatcher-3]
[akka://sparkExecutor/system/log1-Slf4jLogger] started
(akka.event.slf4j.Slf4jLogger@24988707)
[DEBUG] [10/30/2013 08:40:30.477]
[sparkExecutor-akka.actor.default-dispatcher-4]
[akka://sparkExecutor/system] now supervising
Actor[akka://sparkExecutor/system/log1-Slf4jLogger#719056881]

(followed by similar mesages for the "spark" system)

I dunno if this means much more to you, but it seems to me that for some
reason the executor decides to disconnect from the master -- unfortunately
we don't know why.  I think my logging configuration is not getting applied
correctly, or "log-sent-messages" & "log-received-messages" don't do what I
think they do ... something conflicting must be turing that logging off.
There are a zillion different remoting settings:
http://doc.akka.io/docs/akka/snapshot/scala/remoting.html

I feel like I really need to get those messages on why it disconnected to
know which ones to play with.  Any ideas for config changes to see those
messages?

thanks




On Wed, Oct 30, 2013 at 10:09 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:

> Can you apply this patch too and check the logs of Driver and worker.
>
> diff --git
> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> index b6f0ec9..ad0ebf7 100644
> ---
> a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> +++
> b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
> @@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler:
> ClusterScheduler, actorSystem: Actor
>      // Remove a disconnected slave from the cluster
>      def removeExecutor(executorId: String, reason: String) {
>        if (executorActor.contains(executorId)) {
> -        logInfo("Executor " + executorId + " disconnected, so removing
> it")
> +        logInfo("Executor " + executorId + " disconnected, so removing
> it, reason:" + reason)
>          val numCores = freeCores(executorId)
>          actorToExecutorId -= executorActor(executorId)
>          addressToExecutorId -= executorAddress(executorId)
>
>
>
>
> On Wed, Oct 30, 2013 at 8:18 PM, Imran Rashid <imran@quantifind.com>wrote:
>
>> I just realized something about the failing stages -- they generally
>> occur in steps like this:
>>
>> rdd.mapPartitions{itr =>
>>   val myCounters = initializeSomeDataStructure()
>>   itr.foreach{
>>     //update myCounter in here
>>     ...
>>   }
>>
>>   myCounters.iterator.map{
>>     //some other transformation here ...
>>   }
>> }
>>
>> that is, as a partition is processed, nothing gets output, we just
>> accumulate some values.  Only at the end of the partition do we output some
>> accumulated values.
>>
>> These stages don't always fail, and generally they do succeed after the
>> executor has died and a new one has started -- so I'm pretty confident its
>> not a problem w/ the code.  But maybe we need to add something like a
>> periodic heartbeat in this kind of operation?
>>
>>
>>
>> On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <imran@quantifind.com>wrote:
>>
>>> I'm gonna try turning on more akka debugging msgs as described at
>>> http://akka.io/faq/
>>> and
>>>
>>> http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations
>>>
>>> unfortunately that will require a patch to spark, but hopefully that
>>> will give us more info to go on ...
>>>
>>>
>>> On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma <scrapcodes@gmail.com>wrote:
>>>
>>>> I have things running (from scala 2.10 branch) for over 3-4 hours now
>>>> without a problem and my jobs write data about the same as you suggested.
>>>> My cluster size is 7 nodes and not *congested* for memory. I going to leave
>>>> jobs running all night long. Meanwhile I had encourage you to try to spot
>>>> the problem such that it is reproducible that can help a ton in fixing the
>>>> issue.
>>>>
>>>> Thanks for testing and reporting your experience. I still feel there is
>>>> something else wrong !. About tolerance for network connection timeouts,
>>>> setting those properties should work, but I am afraid about Disassociation
>>>> Event though. I will have to check this is indeed hard to reproduce bug if
>>>> it is, I mean how do I simulate network delays ?
>>>>
>>>>
>>>> On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <imran@quantifind.com>wrote:
>>>>
>>>>> This is a spark-standalone setup (not mesos), on our own cluster.
>>>>>
>>>>> At first I thought it must be some temporary network problem too --
>>>>> but the times between receiving task completion events from an executor
and
>>>>> declaring it failed are really small, so I didn't think that could possibly
>>>>> be it.  Plus we tried increasing various akka timeouts, but that didn't
>>>>> help.  Or maybe there are some other spark / akka properities we should
be
>>>>> setting?  It certainly should be resilient to such a temporary network
>>>>> issue, if that is the problem.
>>>>>
>>>>> btw, I think I've noticed this happens most often during
>>>>> ShuffleMapTasks.  The tasks write out very small amounts of data (64
MB
>>>>> total for the entire stage).
>>>>>
>>>>> thanks
>>>>>
>>>>> On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma <scrapcodes@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Are you using mesos ? I admit to have not properly tested things
on
>>>>>> mesos though.
>>>>>>
>>>>>>
>>>>>> On Wed, Oct 30, 2013 at 11:31 AM, Prashant Sharma <
>>>>>> scrapcodes@gmail.com> wrote:
>>>>>>
>>>>>>> Those log messages are new to the Akka 2.2 and are usually seen
when
>>>>>>> a node is disassociated with other by either a network failure
or even
>>>>>>> clean shutdown. This suggests some network issue to me, are you
running on
>>>>>>> EC2 ? It might be a temporary thing in that case.
>>>>>>>
>>>>>>> I had like to have more details on the long jobs though, how
long ?
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid <imran@quantifind.com>wrote:
>>>>>>>
>>>>>>>> We've been testing out the 2.10 branch of spark, and we're
running
>>>>>>>> into some issues were akka disconnects from the executors
after a while.
>>>>>>>> We ran some simple tests first, and all was well, so we started
upgrading
>>>>>>>> our whole codebase to 2.10.  Everything seemed to be working,
but then we
>>>>>>>> noticed that when we run long jobs, and then things start
failing.
>>>>>>>>
>>>>>>>>
>>>>>>>> The first suspicious thing is that we get akka warnings about
>>>>>>>> undeliverable messages sent to deadLetters:
>>>>>>>>
>>>>>>>> 22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>> was not delivered. [4] dead letters encountered. This logging
can be turned
>>>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters'
and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700]
>>>>>>>> was not delivered. [5] dead letters encountered. This logging
can be turned
>>>>>>>> off or adjusted with configuration settings 'akka.log-dead-letters'
and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Generally within a few seconds after the first such message,
there
>>>>>>>> are a bunch more, and then the executor is marked as failed,
and a new one
>>>>>>>> is started:
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3]
>>>>>>>> INFO  akka.actor.LocalActorRef - Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>> Actor[akka://spark/deadLetters] to
>>>>>>>> Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%
>>>>>>>> 40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered.
>>>>>>>> [10] dead letters encountered, no more dead letters will
be logged. This
>>>>>>>> logging can be turned off or adjusted with configuration
settings
>>>>>>>> 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  org.apache.spark.deploy.client.Client$ClientActor -
Executor updated:
>>>>>>>> app-20131029110000-0000/1 is now FAILED (Command exited with
code 1)
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17]
>>>>>>>> INFO  org.apache.spark.deploy.client.Client$ClientActor -
Executor added:
>>>>>>>> app-20131029110000-0000/2 on
>>>>>>>> worker-20131029105824-dhd2.quantifind.com-51544 (
>>>>>>>> dhd2.quantifind.com:51544) with 24 cores
>>>>>>>>
>>>>>>>> 2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18]
>>>>>>>> ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp://
>>>>>>>> spark@ddd0.quantifind.com:43068] -> [akka.tcp://
>>>>>>>> sparkExecutor@dhd2.quantifind.com:45794]: Error [Association
>>>>>>>> failed with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]]
[
>>>>>>>> akka.remote.EndpointAssociationException: Association failed
with
>>>>>>>> [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]
>>>>>>>> Caused by:
>>>>>>>> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
>>>>>>>> Connection refused: dhd2.quantifind.com/10.10.5.64:45794]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Looking in the logs of the failed executor, there are some
similar
>>>>>>>> messages about undeliverable messages, but I don't see any
reason:
>>>>>>>>
>>>>>>>> 13/10/29 11:03:52 INFO executor.Executor: Finished task ID
943
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters]
to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered.
[1]
>>>>>>>> dead letters encountered. This logging can be turned off
or adjusted with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered.
[2]
>>>>>>>> dead letters encountered. This logging can be turned off
or adjusted with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.AssociationHandle$Disassociated] from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered.
[3]
>>>>>>>> dead letters encountered. This logging can be turned off
or adjusted with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend:
Driver
>>>>>>>> terminated or disconnected! Shutting down.
>>>>>>>>
>>>>>>>> 13/10/29 11:03:53 INFO actor.LocalActorRef: Message
>>>>>>>> [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from
>>>>>>>> Actor[akka://sparkExecutor/deadLetters] to
>>>>>>>> Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%
>>>>>>>> 40ddd0.quantifind.com%3A43068-1#772172548] was not delivered.
[4]
>>>>>>>> dead letters encountered. This logging can be turned off
or adjusted with
>>>>>>>> configuration settings 'akka.log-dead-letters' and
>>>>>>>> 'akka.log-dead-letters-during-shutdown'.
>>>>>>>>
>>>>>>>>
>>>>>>>> After this happens, spark does launch a new executor successfully,
>>>>>>>> and continue the job.  Sometimes, the job just continues
happily and there
>>>>>>>> aren't any other problems.  However, that executor may have
to run a bunch
>>>>>>>> of steps to re-compute some cached RDDs -- and during that
time, another
>>>>>>>> executor may crash similarly, and then we end up in a never
ending loop, of
>>>>>>>> one executor crashing, then trying to reload data, while
the others sit
>>>>>>>> around.
>>>>>>>>
>>>>>>>> I have no idea what is triggering this behavior -- there
isn't any
>>>>>>>> particular point in the job that it regularly occurs at.
 Certain steps
>>>>>>>> seem more prone to this, but there isn't any step which regularly
causes
>>>>>>>> the problem.  In a long pipeline of steps, though, that loop
becomes very
>>>>>>>> likely.  I don't think its a timeout issue -- the initial
failing executors
>>>>>>>> can be actively completing stages just seconds before this
failure
>>>>>>>> happens.  We did try adjusting some of the spark / akka timeouts:
>>>>>>>>
>>>>>>>>     -Dspark.storage.blockManagerHeartBeatMs=300000
>>>>>>>>     -Dspark.akka.frameSize=150
>>>>>>>>     -Dspark.akka.timeout=120
>>>>>>>>     -Dspark.akka.askTimeout=30
>>>>>>>>     -Dspark.akka.logLifecycleEvents=true
>>>>>>>>
>>>>>>>> but those settings didn't seem to help the problem at all.
 I
>>>>>>>> figure it must be some configuration with the new version
of akka that
>>>>>>>> we're missing, but we haven't found anything.  Any ideas?
>>>>>>>>
>>>>>>>> our code works fine w/ the 0.8.0 release on scala 2.9.3.
 The
>>>>>>>> failures occur on the tip of the scala-2.10 branch (5429d62d)
>>>>>>>>
>>>>>>>> thanks,
>>>>>>>> Imran
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> s
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> s
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> s
>>>>
>>>
>>>
>>
>
>
> --
> s
>

Mime
View raw message