We've been testing out the 2.10 branch of spark, and we're running into some issues were akka disconnects from the executors after a while.  We ran some simple tests first, and all was well, so we started upgrading our whole codebase to 2.10.  Everything seemed to be working, but then we noticed that when we run long jobs, and then things start failing.


The first suspicious thing is that we get akka warnings about undeliverable messages sent to deadLetters:

22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.



Generally within a few seconds after the first such message, there are a bunch more, and then the executor is marked as failed, and a new one is started:

2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor updated: app-20131029110000-0000/1 is now FAILED (Command exited with code 1)

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor added: app-20131029110000-0000/2 on worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544) with 24 cores

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp://spark@ddd0.quantifind.com:43068] -> [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]: Error [Association failed with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]] [
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: dhd2.quantifind.com/10.10.5.64:45794]



Looking in the logs of the failed executor, there are some similar messages about undeliverable messages, but I don't see any reason:

13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


After this happens, spark does launch a new executor successfully, and continue the job.  Sometimes, the job just continues happily and there aren't any other problems.  However, that executor may have to run a bunch of steps to re-compute some cached RDDs -- and during that time, another executor may crash similarly, and then we end up in a never ending loop, of one executor crashing, then trying to reload data, while the others sit around.

I have no idea what is triggering this behavior -- there isn't any particular point in the job that it regularly occurs at.  Certain steps seem more prone to this, but there isn't any step which regularly causes the problem.  In a long pipeline of steps, though, that loop becomes very likely.  I don't think its a timeout issue -- the initial failing executors can be actively completing stages just seconds before this failure happens.  We did try adjusting some of the spark / akka timeouts:

    -Dspark.storage.blockManagerHeartBeatMs=300000
    -Dspark.akka.frameSize=150
    -Dspark.akka.timeout=120
    -Dspark.akka.askTimeout=30
    -Dspark.akka.logLifecycleEvents=true

but those settings didn't seem to help the problem at all.  I figure it must be some configuration with the new version of akka that we're missing, but we haven't found anything.  Any ideas?

our code works fine w/ the 0.8.0 release on scala 2.9.3.  The failures occur on the tip of the scala-2.10 branch (5429d62d)

thanks,
Imran