spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: bug using kryo as closure serializer
Date Mon, 05 May 2014 03:48:37 GMT
Good idea. I submitted a pull request for the doc update here:
https://github.com/apache/spark/pull/642


On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth <soren@yieldbot.com> wrote:

> Thanks for the reply!
>
> Ok, if that's the case, I'd recommend a note to that affect in the docs at
> least.
>
> Just to give some more context here, I'm working on a Clojure DSL for Spark
> called Flambo, which I plan to open source shortly. If I could I'd like to
> focus on the initial bug that I hit.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted:
> Exception while deserializing and fetching task:
> com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Can not set final
> scala.collection.convert.Wrappers field
> scala.collection.convert.Wrappers$SeqWrapper.$outer to
> clojure.lang.PersistentVector
> Serialization trace:
> $outer (scala.collection.convert.Wrappers$SeqWrapper)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
>         at scala.Option.foreach(Option.scala:236)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> This happens immediately after all the tasks of a reduce stage complete
> successfully. Here is the function throwing the exception:
>
>
> https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43
>
> This is where I get lost. From googling around, it seems that scala is
> trying to wrap the result of my task, which contain
> clojure.lang.PersistentVector objects in a scala collection, but I don't
> know why it's doing that. I have a registered kryo serializer for
> clojure.lang.PersistentVector.
>
> based on this line is looks like it's trying to use the closure serializer,
> yet the expection thrown is from com.esotericsoftware.kryo.KryoException:
>
>
> https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39
>
> Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
> trying to deal with my clojure.lang.PeristentVector class?
>
> Where do I go from here?
>
>
> On Sun, May 4, 2014 at 12:50 PM, Reynold Xin <rxin@databricks.com> wrote:
>
> > I added the config option to use the non-default serializer. However, at
> > the time, Kryo fails serializing pretty much any closures so that option
> > was never really used / recommended.
> >
> > Since then the Scala ecosystem has developed, and some other projects are
> > starting to use Kryo to serialize more Scala data structures, so I
> wouldn't
> > be surprised if there is a way to work around this now. However, I don't
> > have enough time to look into it at this point. If you do, please do post
> > your findings. Thanks.
> >
> >
> >
> > On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth <soren@yieldbot.com>
> wrote:
> >
> > > apologies for the cross-list posts, but I've gotten zero response in
> the
> > > user list and I guess this list is probably more appropriate.
> > >
> > > According to the documentation, using the KryoSerializer for closures
> is
> > > supported. However, when I try to set `spark.closure.serializer` to
> > > `org.apache.spark.serializer.KryoSerializer` thing fail pretty
> miserably.
> > >
> > > The first thing that happens it that is throws exceptions over and over
> > > that it cannot locate my registrator class, which is located in my
> > assembly
> > > jar like so:
> > >
> > > 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
> > > spark.kryo.registrator
> > > java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > > at java.lang.Class.forName0(Native Method)
> > > at java.lang.Class.forName(Class.java:270)
> > > at
> > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
> > > at
> > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
> > > at scala.Option.foreach(Option.scala:236)
> > > at
> > >
> >
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
> > > at
> > >
> > >
> >
> org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:116)
> > > at
> > >
> > >
> >
> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
> > > at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
> > > at
> > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > > at
> > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:415)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > > at
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > > at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > at java.lang.Thread.run(Thread.java:724)
> > >
> > > Now, I would expect it not to be able to find this class since it
> hasn't
> > > yet fetched my assembly jar to the executors. Once it does fetch my
> jar,
> > > those expections stop. Next, all the executor task die with the
> following
> > > exception:
> > >
> > > java.nio.ReadOnlyBufferException
> > > at java.nio.ByteBuffer.array(ByteBuffer.java:961)
> > > at
> > >
> > >
> >
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
> > > at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
> > > at
> > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> > > at
> > >
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at javax.security.auth.Subject.doAs(Subject.java:415)
> > > at
> > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> > > at
> > >
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> > > at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > > at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > > at java.lang.Thread.run(Thread.java:724)
> > >
> > > AFAIK, I'm not doing anything out of the ordinary, just turning on kryo
> > and
> > > using the registrator mechanism to register a couple custom
> serializers.
> > >
> > > The reason I tried turning on kryo for closure in the first place is
> > > because of a different bug that I was hitting during fetching and
> > > deserializing of tasks from my executors, which I detailed here:
> > >
> > >
> > >
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html
> > >
> > > Here's hoping some on this list can help me track down what's happening
> > as
> > > I didn't get a single reply on the user list.
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message