spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yadid Ayzenberg <ya...@media.mit.edu>
Subject Re: java.io.NotSerializableException on RDD count() in Java
Date Mon, 04 Nov 2013 01:13:16 GMT
Im running in local[4] mode - so there are no slave machines. Full stack 
trace:

(run-main) org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at 
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

when I add implements Serializable to my class, I get the following 
stack trace:

error] (run-main) org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
org.apache.spark.SparkException: Job failed: 
java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
     at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
     at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
     at 
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
     at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
     at 
org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
[debug]     Thread run-main exited.
[debug] Interrupting remaining threads (should be all daemons).
[debug] Sandboxed run complete..
java.lang.RuntimeException: Nonzero exit code: 1
     at scala.sys.package$.error(package.scala:27)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
     at scala.Option.foreach(Option.scala:236)
     at sbt.BuildCommon$class.toError(Defaults.scala:1628)
     at sbt.Defaults$.toError(Defaults.scala:34)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
     at 
sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
     at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
     at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
     at sbt.std.Transform$$anon$4.work(System.scala:64)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at 
sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
     at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
     at sbt.Execute.work(Execute.scala:244)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
     at 
sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
     at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
     at java.lang.Thread.run(Thread.java:695)

I can post my code if that helps


On 11/3/13 8:05 PM, Patrick Wendell wrote:
> If you look in the UI, are there failures on any of the slaves that
> you can give a  stack trace for? That would narrow down where the
> serialization error is happening.
>
> Unfortunately this code path doesn't print a full stack trace which
> makes it harder to debug where the serialization error comes from.
>
> Could you post all of your code?
>
> Also, just wondering, what happens if you just go ahead and add
> "extends Serializable" to AnalyticsEngine class? It's possible this is
> happening during closure serialization, which will use the closure
> serializer (which is by default Java).
>
> - Patrick
>
> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg <yadid@media.mit.edu> wrote:
>> yes, I tried that as well (it is currently registered with Kryo)- although
>> it doesnt make sense to me (and doesnt solve the problem). I also made sure
>> my registration was running:
>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running user
>> registrator: edu.mit.bsense.MyRegistrator
>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>> org.apache.spark.serializer.KryoSerializer  - Running user registrator:
>> edu.mit.bsense.MyRegistrator
>>
>> edu.mit.bsense.AnalyticsEngine is the class containing the SC which
>> instantiates the RDDs and runs the map() and count().
>> Can you explain why it needs to be serialized?
>>
>> Also, when running count() on my original RDD (pre map) I get the right
>> answer - this means the classes of data in the RDD are serializable.
>> It's only when I run map, and then count() on a new RDD do I get this
>> exception. My map does not introduce any new classes it - just iterates over
>> the existing data.
>>
>> Any ideas?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>>> edu.mit.bsense.AnalyticsEngine
>>>
>>> Look at the exception. Basically, you'll need to register every class
>>> type that is recursively used by BSONObject.
>>>
>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg <yadid@media.mit.edu>
>>> wrote:
>>>> Hi Patrick,
>>>>
>>>> I am in fact using Kryo and im registering  BSONObject.class (which is
>>>> class
>>>> holding the data) in my KryoRegistrator.
>>>> Im not sure what other classes I should be registering.
>>>>
>>>> Thanks,
>>>>
>>>> Yadid
>>>>
>>>>
>>>>
>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>>>>> The problem is you are referencing a class that does not "extend
>>>>> serializable" in the data that you shuffle. Spark needs to send all
>>>>> shuffle data over the network, so it needs to know how to serialize
>>>>> them.
>>>>>
>>>>> One option is to use Kryo for network serialization as described here
>>>>> - you'll have to register all the class that get serialized though.
>>>>>
>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>>>>>
>>>>> Another option is to write a wrapper class that "extends
>>>>> externalizable" and write the serialization yourself.
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg <yadid@media.mit.edu>
>>>>> wrote:
>>>>>> Hi All,
>>>>>>
>>>>>> My original RDD contains arrays of doubles. when appying a count()
>>>>>> operator
>>>>>> to the original RDD I get the result as expected.
>>>>>> However when I run a map on the original RDD in order to generate
a new
>>>>>> RDD
>>>>>> with only the first element of each array, and try to apply count()
to
>>>>>> the
>>>>>> new generated RDD I get the following exception:
>>>>>>
>>>>>> 19829 [run-main] INFO  org.apache.spark.scheduler.DAGScheduler  -
>>>>>> Failed
>>>>>> to
>>>>>> run count at AnalyticsEngine.java:133
>>>>>> [error] (run-main) org.apache.spark.SparkException: Job failed:
>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>> org.apache.spark.SparkException: Job failed:
>>>>>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>>>>>>        at
>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>>>>>>        at
>>>>>>
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>>>>>>
>>>>>>
>>>>>> If a run a take() operation on the new RDD I receive the results
as
>>>>>> expected. here is my code:
>>>>>>
>>>>>>
>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new FlatMapFunction<Tuple2<Object,
>>>>>> BSONObject>, Double>() {
>>>>>>            @Override
>>>>>>            public Iterable<Double> call(Tuple2<Object, BSONObject>
e) {
>>>>>>              BSONObject doc = e._2();
>>>>>>              List<List<Double>> vals =
>>>>>> (List<List<Double>>)doc.get("data");
>>>>>>              List<Double> results = new ArrayList<Double>();
>>>>>>              for (int i=0; i< vals.size();i++ )
>>>>>>                  results.add((Double)vals.get(i).get(0));
>>>>>>              return results;
>>>>>>
>>>>>>            }
>>>>>>            });
>>>>>>
>>>>>>            logger.info("Take: {}", rdd2.take(100));
>>>>>>            logger.info("Count: {}", rdd2.count());
>>>>>>
>>>>>>
>>>>>> Any ideas on what I am doing wrong ?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yadid
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Yadid Ayzenberg
>>>>>> Graduate Student and Research Assistant
>>>>>> Affective Computing
>>>>>> Phone: 617-866-7226
>>>>>> Room: E14-274G
>>>>>> MIT Media Lab
>>>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>>>
>>>>>>
>>>>>>
>>>> --
>>>> Yadid Ayzenberg
>>>> Graduate Student and Research Assistant
>>>> Affective Computing
>>>> Phone: 617-866-7226
>>>> Room: E14-274G
>>>> MIT Media Lab
>>>> 75 Amherst st, Cambridge, MA, 02139
>>>>
>>>>
>>>>
>>
>> --
>> Yadid Ayzenberg
>> Graduate Student and Research Assistant
>> Affective Computing
>> Phone: 617-866-7226
>> Room: E14-274G
>> MIT Media Lab
>> 75 Amherst st, Cambridge, MA, 02139
>>
>>
>>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Mime
View raw message