spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Gomez Saavedra <mikr...@gmail.com>
Subject Re: Spark streaming with akka association with remote system failure
Date Thu, 17 Mar 2016 05:25:52 GMT
Solved the issue by setting up the same heartbeat interval and pauses in
both actor systems

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = DEBUG
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  log-dead-letters = on
  log-dead-letters-during-shutdown = on

  daemonic = on
  jvm-exit-on-fatal-error = off

  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    default-dispatcher.throughput = 15
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    log-remote-lifecycle-events = on
    require-cookie = off
    secure-cookie = off
    netty.tcp {
      hostname = "spark-engine"
      port = 9083
      tcp-nodelay = on
      transport-class = "akka.remote.transport.netty.NettyTransport"
      connection-timeout = 120 s
      execution-pool-size = 4
    }

    transport-failure-detector {
      heartbeat-interval = 4 s
      acceptable-heartbeat-pause = 16 s
    }
  }
}


.set("spark.akka.heartbeat.interval", "4s")
.set("spark.akka.heartbeat.pauses", "16s")


On Tue, Mar 15, 2016 at 9:50 PM, David Gomez Saavedra <mikrodj@gmail.com>
wrote:

> hi there,
>
> I'm trying to set up a simple spark streaming app using akka actors as
> receivers. I followed the example provided and created two apps. One
> creating an actor system and another one subscribing to it. I can see the
> subscription message but few seconds later i get an error
>
> [info] 20:37:40.296 [INFO ] Slf4jLogger started
> [info] 20:37:40.466 [INFO ] Starting remoting
> [info] 20:37:40.871 [INFO ] Remoting started; listening on addresses
> :[akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.876 [INFO ] Remoting now listens on addresses:
> [akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.913 [INFO ] starting actor on
> akka://spark-engine/user/integrationActor
> [info] received subscribe from Actor[akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
> ]
> [info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out
> or transport failure detector triggered.
> [info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> [info] received unsubscribe from Actor[akka.tcp://
> sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
> ]
>
> I'm running the master and worker on docker. The two apps are running on
> my laptop for testing. Here's the code of both
>
> def main(args: Array[String]) {
>   val conf = new SparkConf()
>     .setMaster(sparkMaster)
>     .setAppName(sparkApp)
>     .set("spark.logConf", "true")
>     .set("spark.driver.port","7001")
>     .set("spark.fileserver.port","6002")
>     .set("spark.broadcast.port","6003")
>     .set("spark.replClassServer.port","6004")
>     .set("spark.blockManager.port","6005")
>     .set("spark.executor.port","6006")
>     .set("spark.akka.heartbeat.interval", "100")
>     .set("spark.akka.logLifecycleEvents", "true")
>     .set("spark.rpc.netty.dispatcher.numThreads","2")
>     .setJars(sparkJars)
>
>
>   val ssc = new StreamingContext(conf, Seconds(5))
>
>   ssc.checkpoint("/tmp")
>
>   val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int, Int, Int]]] (Props(new
GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")),
"TagsReceiver")
>
>   tags.print()
>
>   ssc.start()
>   ssc.awaitTermination()
>
> }
>
>
>
> def main(args: Array[String]) {
>
>   val config = ConfigFactory.load()
>   val system = ActorSystem("spark-engine", config.getConfig("spark-engine"))
>
>   val integrationActor = system.actorOf(Props(new IntegrationActor()), "integrationActor")
>
>   log.info("starting actor on " + integrationActor.path)
>
>   system.awaitTermination()
>
> }
>
>
> This is my config for the remote actor system to where spark subscribes
>
> spark-engine {
>
>   akka {
>     loggers = ["akka.event.slf4j.Slf4jLogger"]
>     loglevel = DEBUG
>     logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
>     log-dead-letters = 10
>     log-dead-letters-during-shutdown = on
>
>     actor {
>       provider = "akka.remote.RemoteActorRefProvider"
>     }
>     remote {
>       enabled-transports = ["akka.remote.netty.tcp"]
>       log-remote-lifecycle-events = on
>       netty.tcp {
>         hostname = "spark-engine"
>         port = 9083
>       }
>     }
>   }
> }
>
> These are the logs from the executor
>
> 16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
> 16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://spark-engine@spark-engine:9083]
has failed, address is now gated for [5000] ms. Reason: [Disassociated]
>
>
>
> Any idea why the two actor systems get disassociated ?
>
> Thank you very much in advanced.
>
> Best
> David
>

Mime
View raw message