spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <luohui20...@sina.com>
Subject 回复:Re: How SparkStreaming output messages to Kafka?
Date Mon, 30 Mar 2015 06:58:24 GMT
 
 
To Saisai:
        it works after I correct some of them with your advices like below:
        Furthermore, I am not quite clear about which code running on driver and which code
running on executor, so i wrote my understanding in comment. would you help check?  Thank
you.
 
To akhil:
      yes, kafka has enough messages.I tested it with kafka producer sending scala Random
Int ,it also works. thanks.
 
object SparkStreamingSampleDirectApproach2 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    val Array(brokers, topics) = Array("localhost:9092", "topic1")
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
"WARN, console")
    val ssc = new StreamingContext(conf, Seconds(1))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams, topicsSet)
    //    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
//    val lines = messages.map(_._2)
//    val words = lines.flatMap(_.split(" "))
//    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
//    wordCounts.print()
    val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
    val props = new Properties()
    props.put("metadata.broker.list", brokers2)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    val messages2 = messages.foreachRDD { rdd =>                         //above code running
on driver
      rdd.foreachPartition { record =>                                                
   //code from here running on executor
        val config = new ProducerConfig(props)
        val producer = new Producer[String, String](config)
        record.foreach { piece =>
          val msg = new KeyedMessage[String, String](topic2, piece._2)
          producer.send(msg)
        }
      }
    }
    //    val messages2 = new KeyedMessage[String, String](topic2,messages.toString())
    //    println(messages2)
    ssc.start()                                                                          
            //again, running on driver
    ssc.awaitTermination()
  }
}
 




--------------------------------


 
Thanks&amp;Best regards!
罗辉 San.Luo



----- 原始邮件 -----
发件人:Saisai Shao <sai.sai.shao@gmail.com>
收件人:luohui20001@sina.com
抄送人:user <user@spark.apache.org>
主题:Re: How SparkStreaming output messages to Kafka?
日期:2015年03月30日 14点03分


Hi Hui, 


Did you try the direct Kafka stream example under Spark Streaming's examples? Does it still
fail to receive the message? Also would you please check all the setups including Kafka, test
with Kafka console consumer to see if Kafka is OK.


Besides seeing from your code, there's some problems in your code, here:


    val messages2 = new KeyedMessage[String, String](topic2,messages.toString())    println(messages2)
       producer.send(messages2)



This code snippets are not lazily evaluated, this will be executed ONLY ONCE when running
to here, so actually you may not write the data into the Kafka, you need to write like this:


messages.foreachRDD { r =>
     r.foreachPartition{ iter =>
         // create Producer
         // change this partition of data (iter) into keyedMessage and write into Kafka.
        }
} 


This is the basic style, sorry for any missing parts and typos, also pay a attention to serialization
issue when you need to create executors on remote side. Please take a try again.


Thanks
Jerry






2015-03-30 13:34 GMT+08:00 <luohui20001@sina.com>:



Hi guys,
          I am using SparkStreaming to receive message from kafka,process it and then send
back to kafka. however ,kafka consumer can not receive any messages. Any one share ideas?
 
here is my code:
 
object SparkStreamingSampleDirectApproach {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)  
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)  
    
    val Array(brokers, topics) = Array("localhost:9092", "topic1")
    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingSampleDirectApproach").set("log4j.rootCategory",
"WARN, console")
    val ssc = new StreamingContext(conf, Seconds(1))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams, topicsSet) 
//    messages.saveAsTextFiles("hdfs://localhost:8020/spark/data", "test")
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    
    val Array(brokers2, topic2) = Array("localhost:9092", "topic2")
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    
    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)
//    val messages2 = messages.map{line =>
//      new KeyedMessage[String, String](topic2,wordCounts.toString())
//    }.toArray
    
    val messages2 = new KeyedMessage[String, String](topic2,messages.toString())
    println(messages2)
    
    producer.send(messages2)
    
    ssc.start()
    ssc.awaitTermination() 
  }
}


--------------------------------


 
Thanks&amp;Best regards!
罗辉 San.Luo

Mime
View raw message