I am not sure if there is a quick fix for this as the actor is started in the same actorSystem as the Spark's actor system. And since that actor system is started as soon as the executor is launched, even before the application code is launched, there isnt much classloader magic that can be done. 

However, I think a solution could be creating a new actorSystem for this purpose. It will get created when the receiver is started, and stopped when the receiver exits. That should not be a big change, though handling the corner cases of shutting and starting actor systems (as receiver's get relaunched) configuring that actor system (as it is different from the Spark's actor system) needs to be handled carefully.

Its best to consider it as a bug. Could you make a JIRA for it? And may be fix it as well ;) ?

TD


On Tue, Sep 2, 2014 at 3:54 PM, Anton Brazhnyk <anton.brazhnyk@genesys.com> wrote:

It works with "spark.executor.extraClassPath" – no exceptions in this case and I’m getting expected results.
But to me it limits/complicates usage Akka based receivers a lot. Do you think it should be considered as a bug?

 

Even if it’s not, can it be fixed/worked around by some classloading magic at either Spark or application code?

 

 

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Friday, August 29, 2014 7:21 PM


To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

 

Can you try adding the JAR to the class path of the executors directly, by setting the config "spark.executor.extraClassPath" in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment

 

I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar. 

 

TD

On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk <anton.brazhnyk@genesys.com> wrote:

Just checked it with 1.0.2

Still same exception.

 

From: Anton Brazhnyk [mailto:anton.brazhnyk@genesys.com]
Sent: Wednesday, August 27, 2014 6:46 PM
To: Tathagata Das
Cc: user@spark.apache.org
Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded jar

 

Sorry for the delay with answer – was on vacation.

As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script.

The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object.

 

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Tuesday, August 05, 2014 11:32 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

 

How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)?

 

TD

 

On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk <anton.brazhnyk@genesys.com> wrote:

Went through it once again to leave the only modification in question. Still same exception.
I hope sources as zip file (instead of github) still can be tolerated. :)

Here is the stacktrace generated with this sources:
14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800
14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage
java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage

        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:270)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
        at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
        at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.serialization.Serialization.deserialize(Serialization.scala:98)
        at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
        at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

-----Original Message-----
From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Tuesday, August 05, 2014 5:42 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

 Can you show us the modified version. The reason could very well be what you suggest, but I want to understand what conditions lead to this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk <anton.brazhnyk@genesys.com> wrote:
> Greetings,
>
>
>
> I modified ActorWordCount example a little and it uses simple case
> class as the message for Streaming instead of the primitive string.
>
> I also modified launch code to not use run-example script, but set
> spark master in the code and attach the jar (setJars(…)) with all the
> classes including new case class. It runs fine in the local[*] mode
> but fails with ClassNotFoundException in standalone cluster (stacktrace follows).
>
>
>
> I assume it’s the classloader problems and akka remoting just doesn’t
> know about the classes coming to the executor from attached jar.
> Am I right?
>
>
>
> I guess I could pass primitive values around and do my own
> (de)serialization but maybe there is a better way?
>
> What’s the correct way to build custom akka-based receiver with usage
> of non-primitive messages?
>
>
>
>
>
> Here is the log excerpt with stacktrace:
>
> 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator
> called at time 1407211181800
>
> 14/08/04 20:59:41 ERROR Remoting:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
> java.lang.ClassNotFoundException:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>         at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>         at java.lang.Class.forName0(Native Method)
>
>         at java.lang.Class.forName(Class.java:270)
>
>         at
> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
>
>         at
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectI
> nputStream.scala:19)
>
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610
> )
>
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
>
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 69)
>
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>
>         at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
>         at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:13
> 6)
>
>         at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
>         at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>
>         at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serializ
> ation.scala:104)
>
>         at scala.util.Try$.apply(Try.scala:161)
>
>         at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>
>         at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.sca
> la:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
>
>         at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.sca
> la:764)
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> actDispatcher.scala:386)
>
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> java:1339)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> 9)
>
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> d.java:107)
>
>
>
>
> WBR,
>
> Anton