spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robin Keunen <robin.keu...@lampiris.be>
Subject Re: Serializing with Kryo NullPointerException - Java
Date Thu, 04 Dec 2014 08:32:13 GMT
Using

         <dependency>
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo-shaded</artifactId>
             <version>3.0.0</version>
         </dependency>

Instead of

         <dependency>
             <groupId>com.esotericsoftware.kryo</groupId>
             <artifactId>kryo</artifactId>
             <version>2.24.0</version>
         </dependency>

fixed this

On 2014-12-03 18:15, Robin Keunen wrote:
> Hi all,
>
> I am having troubles using Kryo and being new to this kind of 
> serialization, I am not sure where to look. Can someone please help 
> me? :-)
>
> Here is my custom class:
>
> public class *DummyClass* implements KryoSerializable {
>     private static final Logger LOGGER = 
> LoggerFactory.getLogger(DummyClass.class);
>     int value;
>
>     public DummyClass() {
>     }
>
>     public DummyClass(int value) {
>         LOGGER.info("hey I'm dum {}!", value);
>         this.value = value;
>     }
>
>     public int getValue() {
>         return value;
>     }
>
>     public void setValue(int value) {
>         this.value = value;
>     }
>
>     @Override
>     public void write(Kryo kryo, Output output) {
>         output.writeInt(value);
>     }
>
>     @Override
>     public void read(Kryo kryo, Input input) {
>         this.value = input.readInt();
>     }
> }
>
> Here is my registrator:
>
> public class MyKryoRegistrator implements KryoRegistrator {
>     @Override
>     public void registerClasses(Kryo kryo) {
>         kryo.register(DummyClass.class);
>     }
> }
>
> And the *Spark* code:
> SparkConf sparkConf = new SparkConf()
>         .setAppName(appName)
>         .setMaster(master)
>         .setJars(jars)
>         .set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>         .set("spark.kryo.registrator", "org.roke.main.MyKryoRegistrator");
>
> JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
> List<DummyClass> dummyClasses = Arrays.asList(
>         new DummyClass(1),
>         new DummyClass(2),
>         new DummyClass(3),
>         new DummyClass(4)
> );
>
> JavaRDD<DummyClass> rdd = sparkContext.parallelize(dummyClasses);
> for (DummyClass dummyClass: rdd.collect())
>     LOGGER.info("driver collected {}", dummyClass);
>
> The program fails with the following NullPointerException:
>
> Exception in thread "main" org.apache.spark.SparkException: Job 
> aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most 
> recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68): 
> java.lang.NullPointerException:
> com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
> com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
> org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
> org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
> org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
>         sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         java.lang.reflect.Method.invoke(Method.java:606)
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
>
> -- 
>
> Robin Keunen
> Software Engineer
> robin.keunen@lampiris.be
> www.lampiris.be

-- 

Robin Keunen
Software Engineer
robin.keunen@lampiris.be
www.lampiris.be


Mime
View raw message