kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <zlai_2...@sina.com>
Subject Re: Re: can high-level consumer api provide the method getting messages with non-block?
Date Mon, 05 Mar 2012 03:29:54 GMT
I modify code as beblow. When the program createmessagestream again, it can not get any message
although there are some new messages. How could I resolve it? Thanks!
 
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("topic", new Integer(1));
while(true){
Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
ConsumerIterator<Message> it = stream.iterator();
try{
while(it.hasNext())
{
ByteBuffer buffer = it.next().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println(new String(bytes));
}
}
catch (ConsumerTimeoutException e){
     e.printStackTrace();
 }
 catch (Exception e){
   e.printStackTrace();
 }
}




Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message