spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danielil <daniel.ha...@veracity-group.com>
Subject Using sparkContext in inside a map function
Date Sun, 08 Mar 2015 16:14:46 GMT
Hi,
We are designing a solution which pulls file paths from Kafka and for the
current stage just counts the lines in each of these files.
When running the code it fails on:
Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
	at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:444)
	at
org.apache.spark.streaming.api.java.JavaDStreamLike$class.mapToPair(JavaDStreamLike.scala:146)
	at
org.apache.spark.streaming.api.java.JavaPairDStream.mapToPair(JavaPairDStream.scala:46)
	at streamReader.App.main(App.java:66)

Is using the sparkContext from inside a map function wrong ?

This is the code we are using:
	SparkConf conf = new SparkConf().setAppName("Simple
Application").setMaster("spark://namenode:7077");

	    // KAFKA
	    final JavaStreamingContext jssc = new JavaStreamingContext(conf, new
Duration(2000));
	    Map<String, Integer> topicMap = new HashMap<String, Integer>();
	    topicMap.put("uploadedFiles", 1);
	    JavaPairReceiverInputDStream<String, String> messages =
	    		KafkaUtils.createStream(jssc, "localhost:2181", "group3", topicMap);
    
	    
	    JavaDStream<String> files = messages.map(new Function<Tuple2&lt;String,
String>, String>() {
	       
	        public String call(Tuple2<String, String> tuple2) {
	          return tuple2._2();
	        }
	      });

	    
	    JavaPairDStream<String, Integer> pairs = messages.mapToPair(
	    		new PairFunction<Tuple2&lt;String, String>, String, Integer>() 
	    		{
	                 public Tuple2<String, Integer> call(Tuple2<String, String>
word) throws Exception 
	                 {
	                	JavaRDD<String> textfile =
jssc.sparkContext().textFile(word._2());
	                	int test = new Long(textfile.count()).intValue();
	                        return new Tuple2<String, Integer>(word._2(),
test);
	                 }
	    		});
	                
	    
	    System.out.println("Printing Messages:");
	    pairs.print();
	    
	    jssc.start();
	    jssc.awaitTermination();
    	    jssc.close();

Thanks,
Daniel



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-sparkContext-in-inside-a-map-function-tp21960.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message