spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Subject Re: Spark streaming not processing messages from partitioned topics
Date Wed, 10 Aug 2016 03:20:23 GMT
Hi Cody,

Without conditional . It is going with fine. But any processing inside
conditional get on to waiting (or) something.
Facing this issue with partitioned topics. I would need conditional to skip
processing when batch is empty.
kafkaStream.foreachRDD(
  rdd => {

   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   /*if (dataFrame.count() > 0) {
   dataFrame.foreach(println)
   }
   else
   {
     println("Empty DStream ")
   }*/
})

On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <cody@koeninger.org> wrote:

> Take out the conditional and the sqlcontext and just do
>
> rdd => {
>   rdd.foreach(println)
>
>
> as a base line to see if you're reading the data you expect
>
> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
> <diwakar.dhanuskodi@gmail.com> wrote:
> > Hi,
> >
> > I am reading json messages from kafka . Topics has 2 partitions. When
> > running streaming job using spark-submit, I could see that  val
> dataFrame =
> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> > something wrong here. Below is code .This environment is cloudera sandbox
> > env. Same issue in hadoop production cluster mode except that it is
> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
> > 0.10 and  Spark 1.4.
> >
> > val kafkaParams =
> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> > val ssc = new StreamingContext(conf, Seconds(1))
> >
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> >
> > val topics = Set("gpp.minf")
> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >
> > kafkaStream.foreachRDD(
> >   rdd => {
> >     if (rdd.count > 0){
> >         val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >        dataFrame.printSchema()
> > //dataFrame.foreach(println)
> > }
> > }
>

Mime
View raw message