spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Gomez Saavedra <mikr...@gmail.com>
Subject Spark streaming with akka association with remote system failure
Date Tue, 15 Mar 2016 20:50:16 GMT
hi there,

I'm trying to set up a simple spark streaming app using akka actors as
receivers. I followed the example provided and created two apps. One
creating an actor system and another one subscribing to it. I can see the
subscription message but few seconds later i get an error

[info] 20:37:40.296 [INFO ] Slf4jLogger started
[info] 20:37:40.466 [INFO ] Starting remoting
[info] 20:37:40.871 [INFO ] Remoting started; listening on addresses
:[akka.tcp://spark-engine@spark-engine:9083]
[info] 20:37:40.876 [INFO ] Remoting now listens on addresses:
[akka.tcp://spark-engine@spark-engine:9083]
[info] 20:37:40.913 [INFO ] starting actor on
akka://spark-engine/user/integrationActor
[info] received subscribe from Actor[akka.tcp://
sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
]
[info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out or
transport failure detector triggered.
[info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp://
sparkExecutorActorSystem@172.18.0.2:6006] has failed, address is now gated
for [5000] ms. Reason: [Disassociated]
[info] received unsubscribe from Actor[akka.tcp://
sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036
]

I'm running the master and worker on docker. The two apps are running on my
laptop for testing. Here's the code of both

def main(args: Array[String]) {
  val conf = new SparkConf()
    .setMaster(sparkMaster)
    .setAppName(sparkApp)
    .set("spark.logConf", "true")
    .set("spark.driver.port","7001")
    .set("spark.fileserver.port","6002")
    .set("spark.broadcast.port","6003")
    .set("spark.replClassServer.port","6004")
    .set("spark.blockManager.port","6005")
    .set("spark.executor.port","6006")
    .set("spark.akka.heartbeat.interval", "100")
    .set("spark.akka.logLifecycleEvents", "true")
    .set("spark.rpc.netty.dispatcher.numThreads","2")
    .setJars(sparkJars)


  val ssc = new StreamingContext(conf, Seconds(5))

  ssc.checkpoint("/tmp")

  val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int,
Int, Int]]] (Props(new
GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")),
"TagsReceiver")

  tags.print()

  ssc.start()
  ssc.awaitTermination()

}



def main(args: Array[String]) {

  val config = ConfigFactory.load()
  val system = ActorSystem("spark-engine", config.getConfig("spark-engine"))

  val integrationActor = system.actorOf(Props(new IntegrationActor()),
"integrationActor")

  log.info("starting actor on " + integrationActor.path)

  system.awaitTermination()

}


This is my config for the remote actor system to where spark subscribes

spark-engine {

  akka {
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    loglevel = DEBUG
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
    log-dead-letters = 10
    log-dead-letters-during-shutdown = on

    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      log-remote-lifecycle-events = on
      netty.tcp {
        hostname = "spark-engine"
        port = 9083
      }
    }
  }
}

These are the logs from the executor

16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with
remote system [akka.tcp://spark-engine@spark-engine:9083] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]



Any idea why the two actor systems get disassociated ?

Thank you very much in advanced.

Best
David

Mime
View raw message