spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Gomez Saavedra <>
Subject Spark streaming with akka association with remote system failure
Date Tue, 15 Mar 2016 20:50:16 GMT
hi there,

I'm trying to set up a simple spark streaming app using akka actors as
receivers. I followed the example provided and created two apps. One
creating an actor system and another one subscribing to it. I can see the
subscription message but few seconds later i get an error

[info] 20:37:40.296 [INFO ] Slf4jLogger started
[info] 20:37:40.466 [INFO ] Starting remoting
[info] 20:37:40.871 [INFO ] Remoting started; listening on addresses
[info] 20:37:40.876 [INFO ] Remoting now listens on addresses:
[info] 20:37:40.913 [INFO ] starting actor on
[info] received subscribe from Actor[akka.tcp://
[info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out or
transport failure detector triggered.
[info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp://
sparkExecutorActorSystem@] has failed, address is now gated
for [5000] ms. Reason: [Disassociated]
[info] received unsubscribe from Actor[akka.tcp://

I'm running the master and worker on docker. The two apps are running on my
laptop for testing. Here's the code of both

def main(args: Array[String]) {
  val conf = new SparkConf()
    .set("spark.logConf", "true")
    .set("spark.akka.heartbeat.interval", "100")
    .set("spark.akka.logLifecycleEvents", "true")

  val ssc = new StreamingContext(conf, Seconds(5))


  val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int,
Int, Int]]] (Props(new




def main(args: Array[String]) {

  val config = ConfigFactory.load()
  val system = ActorSystem("spark-engine", config.getConfig("spark-engine"))

  val integrationActor = system.actorOf(Props(new IntegrationActor()),
"integrationActor")"starting actor on " + integrationActor.path)



This is my config for the remote actor system to where spark subscribes

spark-engine {

  akka {
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = DEBUG
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
    log-dead-letters = 10
    log-dead-letters-during-shutdown = on

    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      log-remote-lifecycle-events = on
      netty.tcp {
        hostname = "spark-engine"
        port = 9083

These are the logs from the executor

16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with
remote system [akka.tcp://spark-engine@spark-engine:9083] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]

Any idea why the two actor systems get disassociated ?

Thank you very much in advanced.


View raw message