spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sean Owen <so...@cloudera.com>
Subject Re: unable to write SequenceFile using saveAsNewAPIHadoopFile
Date Thu, 22 Jan 2015 08:58:36 GMT
First as an aside I am pretty sure you cannot reuse one Text and
IntWritable object here. Spark does not necessarily finish with one's value
before the next call(). Although it should not be directly related to the
serialization problem I suspect it is. Your function is not serializable
since it contains references to these cached writables. I think removing
them fixes both problems.
On Jan 22, 2015 9:42 AM, "Skanda" <skanda.ganapathy@gmail.com> wrote:

> Hi All,
>
> I'm using the saveAsNewAPIHadoopFile API to write SequenceFiles but I'm
> getting the following runtime exception:
>
> *Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable*
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>     at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
>     at org.apache.spark.rdd.RDD.map(RDD.scala:271)
>     at
> org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:102)
>     at
> org.apache.spark.api.java.JavaPairRDD.mapToPair(JavaPairRDD.scala:45)
>     at XoanonKMeansV2.main(XoanonKMeansV2.java:125)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:606)
>     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> *Caused by: java.io.NotSerializableException:
> org.apache.hadoop.io.IntWritable*
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>     at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>     at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>     at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>     ... 13 more
>
> Pls find below the code snippet:
>
> joiningKeyPlusPredictedPoint.mapToPair(
>                 new PairFunction<Tuple2<String, Integer>, Text,
> IntWritable>() {
>                     Text text = new Text();
>                     IntWritable intwritable = new IntWritable();
>
>                     @Override
>                     public Tuple2<Text, IntWritable> call(
>                             Tuple2<String, Integer> tuple) throws
> Exception {
>                         text.set(tuple._1);
>                         intwritable.set(tuple._2);
>                         return new Tuple2<Text, IntWritable>(text,
> intwritable);
>                     }
>                 })
>
> *.saveAsNewAPIHadoopFile("/mllib/data/clusteroutput_seq",
> Text.class, IntWritable.class, SequenceFileOutputFormat.class);*
>
> Regards,
> Skanda
>

Mime
View raw message