kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Re: can high-level consumer api provide the method getting messages with non-block?
Date Thu, 08 Mar 2012 16:39:43 GMT
>> Therefore it appears that without the resolution of 242, the
consumer.timeout.ms parameter would not work.

Please can you elaborate on this ? Do you have some test code we can look at ?

Thanks,
Neha

On Wed, Mar 7, 2012 at 5:19 PM, Milind Parikh <milindparikh@gmail.com> wrote:
> I introduced a sleep to let rebalance make place to account for 242 before
> new createmessagestream. However to no avail.
>
> Therefore it appears that without the resolution of 242, the
> consumer.timeout.ms parameter would not work.
>
> Thoughts/workaround?
>
> My current workaround is inside the while(true){}; but not ideal as I have
> to make some counter to increase and check counter at every iteration of
> while(it.next()).
>
> Regards
> Milind
>
>
> On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <neha.narkhede@gmail.com>wrote:
>
>> You are probably hitting this bug in Kafka -
>> https://issues.apache.org/jira/browse/KAFKA-242
>>
>> Thanks,
>> Neha
>>
>> 2012/3/4  <zlai_2001@sina.com>:
>> > I modify code as beblow. When the program createmessagestream again, it
>> can not get any message although there are some new messages. How could I
>> resolve it? Thanks!
>> >
>> > ConsumerConnector consumer =
>> kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
>> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>> > topicCountMap.put("topic", new Integer(1));
>> > while(true){
>> > Map<String, List<KafkaMessageStream<Message>>> consumerMap
=
>> consumer.createMessageStreams(topicCountMap);
>> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
>> > ConsumerIterator<Message> it = stream.iterator();
>> > try{
>> > while(it.hasNext())
>> > {
>> > ByteBuffer buffer = it.next().payload();
>> > byte [] bytes = new byte[buffer.remaining()];
>> > buffer.get(bytes);
>> > System.out.println(new String(bytes));
>> > }
>> > }
>> > catch (ConsumerTimeoutException e){
>> >     e.printStackTrace();
>> >  }
>> >  catch (Exception e){
>> >   e.printStackTrace();
>> >  }
>> > }
>> >
>> >
>> >
>> >
>>

Mime
View raw message