spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Spark Enthusiast <sparkenthusi...@yahoo.in>
Subject Serialization Exception
Date Mon, 29 Jun 2015 12:14:54 GMT
For prototyping purposes, I created a test program injecting dependancies using Spring. 

Nothing fancy. This is just a re-write of KafkaDirectWordCount. When I run this, I get the
following exception:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
    at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
    at com.olacabs.spark.examples.WordCountProcessorKafkaImpl.process(WordCountProcessorKafkaImpl.java:45)
    at com.olacabs.spark.examples.WordCountApp.main(WordCountApp.java:49)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream
is being serialized  possibly as a part of closure of an RDD operation. This is because 
the DStream object is being referred to from within the closure.  Please rewrite the RDD
operation inside this DStream to avoid this.  This has been enforced to avoid bloating of
Spark tasks  with unnecessary objects.
Serialization stack:
    - object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext,
value: org.apache.spark.streaming.api.java.JavaStreamingContext@7add323c)
    - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl, name: streamingContext,
type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
    - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl, com.olacabs.spark.examples.WordCountProcessorKafkaImpl@29a1505c)
    - field (class: com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, name: this$0,
type: class com.olacabs.spark.examples.WordCountProcessorKafkaImpl)
    - object (class com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1, com.olacabs.spark.examples.WordCountProcessorKafkaImpl$1@c6c82aa)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name:
fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
    ... 23 more



Can someone help me figure out why?


Here is the Code :

public interface EventProcessor extends Serializable {
    void process();
}
public class WordCountProcessorKafkaImpl implements EventProcessor {

    private static final Pattern SPACE = Pattern.compile(" ");

    @Autowired
    @Qualifier("streamingContext")
    JavaStreamingContext streamingContext;

    @Autowired
    @Qualifier("inputDStream")
    JavaPairInputDStream<String, String> inputDStream;

    @Override
    public void process() {
        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = inputDStream.map(new Function<Tuple2<String,
String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>()
{
            @Override
            public Iterable<String> call(String x) {
                return Lists.newArrayList(SPACE.split(x));
            }
        });
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        wordCounts.print();

        // Start the computation
        streamingContext.start();
        streamingContext.awaitTermination();

    }
}


Mime
View raw message