If you are able to get RDD count mean u have received data. Give a try by adding a default case statement to debug


On 23 January 2014 15:55, Prashant Sharma <scrapcodes@gmail.com> wrote:
You can also check logs in work directory. I feel spark receiver is for some reason is unable to connect to KafkaStream specified. One quick diagnostic can be done by using a socketStream, and stream can be simply created(faked) by net cat utility in unix.


On Thu, Jan 23, 2014 at 3:50 PM, Sourav Chandra <sourav.chandra@livestream.com> wrote:
Hi Anita,

It did not help.

If I use newStream.print() it shows the RDDs in the stream.
Also in newStream.foreachRDD(rdd => {
rdd.count() // prints counts
println("rdd.collect.toList") // prints rdd list
})

But
newStream.foreachRDD(rdd => {
rdd.foreach({
case (value, count) => {
println("##########################################")
println("value --> " + value + " with count --> " + count)
println("##########################################")
}
})
})
does not print anything

Thanks,
Sourav


On Thu, Jan 23, 2014 at 3:35 PM, Anita Tailor <tailor.anita@gmail.com> wrote:
Hi Sourav,

From "foreach not working" you mean Job is not getting scheduled at batch interval?
I came across similar issue with standalone mode. You can try increasing your batch interval.

I increase the priority of RecurringTimer (incubator-spark/streaming/src/main/scala/org/apache/spark/streamingutil/RecurringTimer.scala) thread to get it working

Regards
Anita




On 23 January 2014 14:31, Sourav Chandra <sourav.chandra@livestream.com> wrote:
Hi,

I am using spark streaming along with kafka dstream. and running the application against standalone cluster


It seems after transformation, when i o foreachRDD, its not working.

code snippet is below :
---------------------------------------------------------------
val ssc = new StreamingContext(...)
val stream = KafkaUtils.createStream(...)
val processedStream = stream.flatMap(...)
val newStream = processedStream.map(x => (x, 1L)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(1), Seconds(1), 2)
newStream.foreachRDD(rdd => {
rdd.foreach({
case (value, count) => {
println("##########################################")
println("value --> " + value + " with count --> " + count)
println("##########################################")
}
})
})

---------------------------------------------------------------

If I run the application locally (local instead of spark://), it is working

Can you suggest what is going on here?

--

Sourav Chandra

Senior Software Engineer

sourav.chandra@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com







--

Sourav Chandra

Senior Software Engineer

sourav.chandra@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Prashant



--
Headlook.com
Helps to connect & unite with people of your city