I am guessing something wrong with using Dissociation event then.

Try applying something on the lines of this patch. This might cause the executors to hang so be prepared for that. 

diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 4e8052a..1ec5d19 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -74,9 +74,13 @@ private[spark] class StandaloneExecutorBackend(
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 
-    case DisassociatedEvent(_, _, _) =>
-      logError("Driver terminated or disconnected! Shutting down.")
+    case Terminated(actor) =>
+      logError("Driver terminated Shutting down.")
       System.exit(1)
+
+    // case DisassociatedEvent(_, _, _) =>
+    //   logError("Driver terminated or disconnected! Shutting down.")
+    //   System.exit(1)
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b6f0ec9..9955484 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -102,8 +102,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
-      case DisassociatedEvent(_, remoteAddress, _) =>
-        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
+      // case DisassociatedEvent(_, remoteAddress, _) =>
+      //   addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
 
       case AssociationErrorEvent(_, _, remoteAddress, _) =>
         addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
@@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     // Remove a disconnected slave from the cluster
     def removeExecutor(executorId: String, reason: String) {
       if (executorActor.contains(executorId)) {
-        logInfo("Executor " + executorId + " disconnected, so removing it")
+        logInfo("Executor " + executorId + " disconnected, so removing it, reason:" + reason)
         val numCores = freeCores(executorId)
         actorToExecutorId -= executorActor(executorId)
         addressToExecutorId -= executorAddress(executorId)



On Wed, Oct 30, 2013 at 9:42 PM, Imran Rashid <imran@quantifind.com> wrote:
and ran it again, with these options:

-Dspark.akka.stdout-loglevel=DEBUG \
  -Dspark.akkaExtra.akka.logLevel=DEBUG\
  -Dspark.akkaExtra.akka.actor.debug.receive=on \  -Dspark.akkaExtra.akka.actor.debug.autoreceive=on \
  -Dspark.akkaExtra.akka.actor.debug.lifecycle=on \
  -Dspark.akkaExtra.akka.remote.log-sent-messages=on \
  -Dspark.akkaExtra.akka.remote.log-received-messages=on\
  -Dspark.akkaExtra.akka.log-config-on-start=on

On the driver, I see:

2013-10-30 08:44:31,034 [spark-akka.actor.default-dispatcher-19] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.64%3A52400-2#-837892141] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-30 08:44:31,058 [spark-akka.actor.default-dispatcher-13] INFO  org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor 1 disconnected, so removing it, reason:remote Akka client disconnected

2013-10-30 08:44:31,059 [spark-akka.actor.default-dispatcher-13] ERROR org.apache.spark.scheduler.cluster.ClusterScheduler - Lost executor 1 on dhd2.quantifind.com: remote Akka client disconnected


on the worker, stderr:

13/10/30 08:44:28 INFO executor.Executor: Finished task ID 934

13/10/30 08:44:31 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.Disassociated [akka.tcp://sparkExecutor@dhd2.quantifind.com:38021] -> [akka.tcp://spark@ddd0.quantifind.com:36730]

and unfortunately, all those akka debug options give me *no* useful info in the worker stdout:

Starting akka system "sparkExecutor" using config:

      akka.daemonic = on
      akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      akka.stdout-loglevel = "DEBUG"
      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
      akka.remote.netty.tcp.hostname = "dhd2.quantifind.com"
      akka.remote.netty.tcp.port = 0
      akka.remote.netty.tcp.connection-timeout = 60 s
      akka.remote.netty.tcp.maximum-frame-size = 10MiB
      akka.remote.netty.tcp.execution-pool-size = 4
      akka.actor.default-dispatcher.throughput = 15
      akka.remote.log-remote-lifecycle-events = off
                       akka.remote.log-sent-messages = on
akka.remote.log-received-messages = on
akka.logLevel = DEBUG
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.log-config-on-start = on
akka.remote.quarantine-systems-for = off
[DEBUG] [10/30/2013 08:40:30.230] [main] [EventStream] StandardOutLogger started
[DEBUG] [10/30/2013 08:40:30.438] [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] started (akka.actor.LocalActorRefProvider$Guardian@4bf54c5f)
[DEBUG] [10/30/2013 08:40:30.446] [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user] started (akka.actor.LocalActorRefProvider$Guardian@72608760)
[DEBUG] [10/30/2013 08:40:30.447] [sparkExecutor-akka.actor.default-dispatcher-4] [akka://sparkExecutor/system] started (akka.actor.LocalActorRefProvider$SystemGuardian@1f57ea4a)
[DEBUG] [10/30/2013 08:40:30.454] [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now supervising Actor[akka://sparkExecutor/user]
[DEBUG] [10/30/2013 08:40:30.454] [sparkExecutor-akka.actor.default-dispatcher-2] [akka://sparkExecutor/] now supervising Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468] [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/user] now monitoring Actor[akka://sparkExecutor/system]
[DEBUG] [10/30/2013 08:40:30.468] [sparkExecutor-akka.actor.default-dispatcher-4] [akka://sparkExecutor/system] now monitoring Actor[akka://sparkExecutor/]
[DEBUG] [10/30/2013 08:40:30.476] [sparkExecutor-akka.actor.default-dispatcher-3] [akka://sparkExecutor/system/log1-Slf4jLogger] started (akka.event.slf4j.Slf4jLogger@24988707)
[DEBUG] [10/30/2013 08:40:30.477] [sparkExecutor-akka.actor.default-dispatcher-4] [akka://sparkExecutor/system] now supervising Actor[akka://sparkExecutor/system/log1-Slf4jLogger#719056881]

(followed by similar mesages for the "spark" system)

I dunno if this means much more to you, but it seems to me that for some reason the executor decides to disconnect from the master -- unfortunately we don't know why.  I think my logging configuration is not getting applied correctly, or "log-sent-messages" & "log-received-messages" don't do what I think they do ... something conflicting must be turing that logging off.  There are a zillion different remoting settings:
http://doc.akka.io/docs/akka/snapshot/scala/remoting.html

I feel like I really need to get those messages on why it disconnected to know which ones to play with.  Any ideas for config changes to see those messages?

thanks




On Wed, Oct 30, 2013 at 10:09 AM, Prashant Sharma <scrapcodes@gmail.com> wrote:
Can you apply this patch too and check the logs of Driver and worker.

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b6f0ec9..ad0ebf7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -132,7 +132,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
     // Remove a disconnected slave from the cluster
     def removeExecutor(executorId: String, reason: String) {
       if (executorActor.contains(executorId)) {
-        logInfo("Executor " + executorId + " disconnected, so removing it")
+        logInfo("Executor " + executorId + " disconnected, so removing it, reason:" + reason)
         val numCores = freeCores(executorId)
         actorToExecutorId -= executorActor(executorId)
         addressToExecutorId -= executorAddress(executorId)




On Wed, Oct 30, 2013 at 8:18 PM, Imran Rashid <imran@quantifind.com> wrote:
I just realized something about the failing stages -- they generally occur in steps like this:

rdd.mapPartitions{itr =>
  val myCounters = initializeSomeDataStructure()
  itr.foreach{
    //update myCounter in here
    ...
  }
 
  myCounters.iterator.map{
    //some other transformation here ...
  }
}

that is, as a partition is processed, nothing gets output, we just accumulate some values.  Only at the end of the partition do we output some accumulated values.

These stages don't always fail, and generally they do succeed after the executor has died and a new one has started -- so I'm pretty confident its not a problem w/ the code.  But maybe we need to add something like a periodic heartbeat in this kind of operation?



On Wed, Oct 30, 2013 at 8:56 AM, Imran Rashid <imran@quantifind.com> wrote:
I'm gonna try turning on more akka debugging msgs as described at
http://akka.io/faq/
and
http://doc.akka.io/docs/akka/current/scala/testing.html#Tracing_Actor_Invocations

unfortunately that will require a patch to spark, but hopefully that will give us more info to go on ...


On Wed, Oct 30, 2013 at 8:10 AM, Prashant Sharma <scrapcodes@gmail.com> wrote:
I have things running (from scala 2.10 branch) for over 3-4 hours now without a problem and my jobs write data about the same as you suggested. My cluster size is 7 nodes and not *congested* for memory. I going to leave jobs running all night long. Meanwhile I had encourage you to try to spot the problem such that it is reproducible that can help a ton in fixing the issue. 

Thanks for testing and reporting your experience. I still feel there is something else wrong !. About tolerance for network connection timeouts, setting those properties should work, but I am afraid about Disassociation Event though. I will have to check this is indeed hard to reproduce bug if it is, I mean how do I simulate network delays ?


On Wed, Oct 30, 2013 at 6:05 PM, Imran Rashid <imran@quantifind.com> wrote:
This is a spark-standalone setup (not mesos), on our own cluster.

At first I thought it must be some temporary network problem too -- but the times between receiving task completion events from an executor and declaring it failed are really small, so I didn't think that could possibly be it.  Plus we tried increasing various akka timeouts, but that didn't help.  Or maybe there are some other spark / akka properities we should be setting?  It certainly should be resilient to such a temporary network issue, if that is the problem.

btw, I think I've noticed this happens most often during ShuffleMapTasks.  The tasks write out very small amounts of data (64 MB total for the entire stage).

thanks

On Wed, Oct 30, 2013 at 6:47 AM, Prashant Sharma <scrapcodes@gmail.com> wrote:
Are you using mesos ? I admit to have not properly tested things on mesos though. 


On Wed, Oct 30, 2013 at 11:31 AM, Prashant Sharma <scrapcodes@gmail.com> wrote:
Those log messages are new to the Akka 2.2 and are usually seen when a node is disassociated with other by either a network failure or even clean shutdown. This suggests some network issue to me, are you running on EC2 ? It might be a temporary thing in that case. 

I had like to have more details on the long jobs though, how long ? 


On Wed, Oct 30, 2013 at 1:29 AM, Imran Rashid <imran@quantifind.com> wrote:
We've been testing out the 2.10 branch of spark, and we're running into some issues were akka disconnects from the executors after a while.  We ran some simple tests first, and all was well, so we started upgrading our whole codebase to 2.10.  Everything seemed to be working, but then we noticed that when we run long jobs, and then things start failing.


The first suspicious thing is that we get akka warnings about undeliverable messages sent to deadLetters:

22013-10-29 11:03:54,577 [spark-akka.actor.default-dispatcher-17] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:54,579 [spark-akka.actor.default-dispatcher-19] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%4010.10.5.81%3A46572-3#656094700] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.



Generally within a few seconds after the first such message, there are a bunch more, and then the executor is marked as failed, and a new one is started:

2013-10-29 11:03:58,775 [spark-akka.actor.default-dispatcher-3] INFO  akka.actor.LocalActorRef - Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://spark/deadLetters] to Actor[akka://spark/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkExecutor%40dhd2.quantifind.com%3A45794-6#-890135716] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

2013-10-29 11:03:58,778 [spark-akka.actor.default-dispatcher-17] INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor updated: app-20131029110000-0000/1 is now FAILED (Command exited with code 1)

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-17] INFO  org.apache.spark.deploy.client.Client$ClientActor - Executor added: app-20131029110000-0000/2 on worker-20131029105824-dhd2.quantifind.com-51544 (dhd2.quantifind.com:51544) with 24 cores

2013-10-29 11:03:58,784 [spark-akka.actor.default-dispatcher-18] ERROR akka.remote.EndpointWriter - AssociationError [akka.tcp://spark@ddd0.quantifind.com:43068] -> [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]: Error [Association failed with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]] [
akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@dhd2.quantifind.com:45794]
Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: dhd2.quantifind.com/10.10.5.64:45794]



Looking in the logs of the failed executor, there are some similar messages about undeliverable messages, but I don't see any reason:

13/10/29 11:03:52 INFO executor.Executor: Finished task ID 943

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.actor.FSM$Timer] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.AssociationHandle$Disassociated] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

13/10/29 11:03:53 ERROR executor.StandaloneExecutorBackend: Driver terminated or disconnected! Shutting down.

13/10/29 11:03:53 INFO actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkExecutor/deadLetters] to Actor[akka://sparkExecutor/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2Fspark%40ddd0.quantifind.com%3A43068-1#772172548] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.


After this happens, spark does launch a new executor successfully, and continue the job.  Sometimes, the job just continues happily and there aren't any other problems.  However, that executor may have to run a bunch of steps to re-compute some cached RDDs -- and during that time, another executor may crash similarly, and then we end up in a never ending loop, of one executor crashing, then trying to reload data, while the others sit around.

I have no idea what is triggering this behavior -- there isn't any particular point in the job that it regularly occurs at.  Certain steps seem more prone to this, but there isn't any step which regularly causes the problem.  In a long pipeline of steps, though, that loop becomes very likely.  I don't think its a timeout issue -- the initial failing executors can be actively completing stages just seconds before this failure happens.  We did try adjusting some of the spark / akka timeouts:

    -Dspark.storage.blockManagerHeartBeatMs=300000
    -Dspark.akka.frameSize=150
    -Dspark.akka.timeout=120
    -Dspark.akka.askTimeout=30
    -Dspark.akka.logLifecycleEvents=true

but those settings didn't seem to help the problem at all.  I figure it must be some configuration with the new version of akka that we're missing, but we haven't found anything.  Any ideas?

our code works fine w/ the 0.8.0 release on scala 2.9.3.  The failures occur on the tip of the scala-2.10 branch (5429d62d)

thanks,
Imran



--
s



--
s




--
s





--
s




--
s