spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laeeq Ahmed <laeeqsp...@yahoo.com.INVALID>
Subject Saving partial (top 10) DStream windows to hdfs
Date Mon, 05 Jan 2015 17:38:09 GMT
Hi,
I am counting values in each window and find the top values and want to save only the top
10 frequent values of each window to hdfs rather than all the values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val
counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _,
_ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd
=>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple
=> "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   
Mime
View raw message