Hi Cody, 

Without conditional . It is going with fine. But any processing inside conditional get on to waiting (or) something. 
Facing this issue with partitioned topics. I would need conditional to skip processing when batch is empty. 
kafkaStream.foreachRDD(
  rdd => {
    
   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   /*if (dataFrame.count() > 0) {
   dataFrame.foreach(println)
   }
   else
   {
     println("Empty DStream ")
   }*/
})

On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <cody@koeninger.org> wrote:
Take out the conditional and the sqlcontext and just do

rdd => {
  rdd.foreach(println)


as a base line to see if you're reading the data you expect

On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
<diwakar.dhanuskodi@gmail.com> wrote:
> Hi,
>
> I am reading json messages from kafka . Topics has 2 partitions. When
> running streaming job using spark-submit, I could see that  val dataFrame =
> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> something wrong here. Below is code .This environment is cloudera sandbox
> env. Same issue in hadoop production cluster mode except that it is
> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
> 0.10 and  Spark 1.4.
>
> val kafkaParams =
> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
> "group.id" -> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
>
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>
> kafkaStream.foreachRDD(
>   rdd => {
>     if (rdd.count > 0){
>         val dataFrame = sqlContext.read.json(rdd.map(_._2))
>        dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }