spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Spark streaming not processing messages from partitioned topics
Date Wed, 10 Aug 2016 03:35:58 GMT
No, you don't need a conditional.  read.json on an empty rdd will
return an empty dataframe.  Foreach on an empty dataframe or an empty
rdd won't do anything (a task will still get run, but it won't do
anything).

Leave the conditional out.  Add one thing at a time to the working
rdd.foreach example and see when it stops working, then take a closer
look at the logs.


On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi
<diwakar.dhanuskodi@gmail.com> wrote:
> Hi Cody,
>
> Without conditional . It is going with fine. But any processing inside
> conditional get on to waiting (or) something.
> Facing this issue with partitioned topics. I would need conditional to skip
> processing when batch is empty.
> kafkaStream.foreachRDD(
>   rdd => {
>
>    val dataFrame = sqlContext.read.json(rdd.map(_._2))
>    /*if (dataFrame.count() > 0) {
>    dataFrame.foreach(println)
>    }
>    else
>    {
>      println("Empty DStream ")
>    }*/
> })
>
> On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <cody@koeninger.org> wrote:
>>
>> Take out the conditional and the sqlcontext and just do
>>
>> rdd => {
>>   rdd.foreach(println)
>>
>>
>> as a base line to see if you're reading the data you expect
>>
>> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
>> <diwakar.dhanuskodi@gmail.com> wrote:
>> > Hi,
>> >
>> > I am reading json messages from kafka . Topics has 2 partitions. When
>> > running streaming job using spark-submit, I could see that  val
>> > dataFrame =
>> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
>> > something wrong here. Below is code .This environment is cloudera
>> > sandbox
>> > env. Same issue in hadoop production cluster mode except that it is
>> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
>> > 0.10 and  Spark 1.4.
>> >
>> > val kafkaParams =
>> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
>> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
>> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>> > val ssc = new StreamingContext(conf, Seconds(1))
>> >
>> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> >
>> > val topics = Set("gpp.minf")
>> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> >
>> > kafkaStream.foreachRDD(
>> >   rdd => {
>> >     if (rdd.count > 0){
>> >         val dataFrame = sqlContext.read.json(rdd.map(_._2))
>> >        dataFrame.printSchema()
>> > //dataFrame.foreach(println)
>> > }
>> > }
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message