spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laeeq Ahmed <laeeqsp...@yahoo.com.INVALID>
Subject Re: Saving partial (top 10) DStream windows to hdfs
Date Wed, 07 Jan 2015 15:38:54 GMT
Hi,
I applied it as fallows:
   eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val
counts = eegStreams(a).map(x => math.round(x.toDouble)).countByValueAndWindow(Seconds(4),
Seconds(4))
val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap)val
topCounts = sortedCounts.mapPartitions(rdd=>rdd.take(10))
//val topCounts = sortedCounts.transform(rdd => ssc.sparkContext.makeRDD(rdd.take(10)))topCounts.map(tuple
=> "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))        topCounts.print()
It gives the output with 10 extra values. I think it works on partition of each rdd rather
than just rdd. I also tried the commented code. It gives correct result but in the start it
gives serialisation error 
ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Output for code in red: The values in green looks extra to me.
0,578-3,5764,559-1,5563,553-6,5406,538-4,5351,52610,48394,8-113,8-137,8-85,8-91,8-121,8114,8108,893,8101,81,128-8,1183,112-4,110-13,1084,108-3,107-10,107-6,1068,10576,674,660,652,670,671,6-60,655,678,564,5
and so on.
Regards,Laeeq
 

     On Tuesday, January 6, 2015 9:06 AM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
   

 You can try something like:

val top10 = your_stream.mapPartitions(rdd => rdd.take(10))

ThanksBest Regards
On Mon, Jan 5, 2015 at 11:08 PM, Laeeq Ahmed <laeeqspark@yahoo.com.invalid> wrote:

Hi,
I am counting values in each window and find the top values and want to save only the top
10 frequent values of each window to hdfs rather than all the values.
eegStreams(a) = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) -> 1),StorageLevel.MEMORY_AND_DISK_SER).map(_._2)val
counts = eegStreams(a).map(x => (math.round(x.toDouble), 1)).reduceByKeyAndWindow(_ + _,
_ - _, Seconds(4), Seconds(4))val sortedCounts = counts.map(_.swap).transform(rdd => rdd.sortByKey(false)).map(_.swap)//sortedCounts.foreachRDD(rdd
=>println("\nTop 10 amplitudes:\n" + rdd.take(10).mkString("\n")))sortedCounts.map(tuple
=> "%s,%s".format(tuple._1, tuple._2)).saveAsTextFiles("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))

I can print top 10 as above in red.
I have also tried 
sortedCounts.foreachRDD{ rdd => ssc.sparkContext.parallelize(rdd.take(10)).saveAsTextFile("hdfs://ec2-23-21-113-136.compute-1.amazonaws.com:9000/user/hduser/output/"
+ (a+1))} 

but I get the following error.
15/01/05 17:12:23 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContextjava.io.NotSerializableException:
org.apache.spark.streaming.StreamingContext
Regards,Laeeq   



   
Mime
View raw message