kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Milind Parikh <milindpar...@gmail.com>
Subject Re: Re: can high-level consumer api provide the method getting messages with non-block?
Date Thu, 08 Mar 2012 01:19:36 GMT
I introduced a sleep to let rebalance make place to account for 242 before
new createmessagestream. However to no avail.

Therefore it appears that without the resolution of 242, the
consumer.timeout.ms parameter would not work.

Thoughts/workaround?

My current workaround is inside the while(true){}; but not ideal as I have
to make some counter to increase and check counter at every iteration of
while(it.next()).

Regards
Milind


On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> You are probably hitting this bug in Kafka -
> https://issues.apache.org/jira/browse/KAFKA-242
>
> Thanks,
> Neha
>
> 2012/3/4  <zlai_2001@sina.com>:
> > I modify code as beblow. When the program createmessagestream again, it
> can not get any message although there are some new messages. How could I
> resolve it? Thanks!
> >
> > ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > topicCountMap.put("topic", new Integer(1));
> > while(true){
> > Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
> > ConsumerIterator<Message> it = stream.iterator();
> > try{
> > while(it.hasNext())
> > {
> > ByteBuffer buffer = it.next().payload();
> > byte [] bytes = new byte[buffer.remaining()];
> > buffer.get(bytes);
> > System.out.println(new String(bytes));
> > }
> > }
> > catch (ConsumerTimeoutException e){
> >     e.printStackTrace();
> >  }
> >  catch (Exception e){
> >   e.printStackTrace();
> >  }
> > }
> >
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message