spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hafiz Mujadid <hafizmujadi...@gmail.com>
Subject getting firs N messages froma Kafka topic using Spark Streaming
Date Wed, 03 Dec 2014 07:35:08 GMT
Hi Experts!

Is there a way to read first N messages from kafka stream and put them in 
some collection and return to the caller for visualization purpose and close
spark streaming.

I will be glad to hear from you and will be thankful to you.

Currently I have following code that 

def getsample(params: scala.collection.immutable.Map[String, String]): Unit
= {
    if (params.contains("zookeeperQourum"))
      zkQuorum = params.get("zookeeperQourum").get
    if (params.contains("userGroup"))
      group = params.get("userGroup").get
    if (params.contains("topics"))
      topics = params.get("topics").get
    if (params.contains("numberOfThreads"))
      numThreads = params.get("numberOfThreads").get
    if (params.contains("sink"))
      sink = params.get("sink").get
    if (params.contains("batchInterval"))
      interval = params.get("batchInterval").get.toInt
    val sparkConf = new
SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
    val ssc = new StreamingContext(sparkConf, Seconds(interval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    var consumerConfig = scala.collection.immutable.Map.empty[String,
String]
    consumerConfig += ("auto.offset.reset" -> "smallest")
    consumerConfig += ("zookeeper.connect" -> zkQuorum)
    consumerConfig += ("group.id" -> group)
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap,
StorageLevel.MEMORY_ONLY).map(_._2)
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x =>
new String(x))
    streams.foreach(rdd => rdd.foreachPartition(itr => {
      while (itr.hasNext && size >= 0) {
        var msg=itr.next
        println(msg)
        sample.append(msg)
        sample.append("\n")
        size -= 1
      }
    }))
    ssc.start()
    ssc.awaitTermination(5000)
    ssc.stop(true)
  }

Where sample is a StringBuilder, when I print the contents of this string
builder after getSample method call is returned. I got nothing in it.


Any help will be appreciated  



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/getting-firs-N-messages-froma-Kafka-topic-using-Spark-Streaming-tp20227.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message