kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echagüe <patric...@gmail.com>
Subject Re: Consumer iterator error
Date Thu, 05 Jan 2012 23:25:44 GMT
I wrote this small test case: http://pastebin.com/Lv1wgK36

First time there is a iterator timeout, my code catches it (line 70). The
second iteration of the while loop falls into the IllegalStateException
after requesting a new iterator from the same stream.

2012/1/5 Patricio Echagüe <patricioe@gmail.com>

> Jun and Neha, Thanks again for the quick response.
>
> I use Kafka 0.7
>
> What I'm trying to achieve is to not block on the iterator. So I set up a
> consumer.timeout.ms=300. The pseudo code is something like this:
>
> private void myMethod() {
>   try {
>
>       for(Message message: stream) {
>
>             System.out.println("Message received: " + message.toString());
>
>         }
>
>   } catch (ConsumerTimeoutException e) {
>
>    // do something
>
>   }
>
> }
>
>
> when a timeout occurs, next time this method is called it does
> stream.iterator() (see the "for" loop in my snippet). So answering Neha's
> question, I do create the iterator again. What I don't re-create is the
> MessageStream. Do I need to re-create the stream as well ?
>
>
> Just requesting a new iterator after a timeout doesn't seem to help.
>
> On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <junrao@gmail.com> wrote:
>
>> Actually, the consumer stream is supposed to be re-iterable after the
>> timeout. But make sure that you check hasNext before calling next when you
>> resume the consumption.
>>
>> Also, could you try this on the 0.7 release? We fixed a bunch of issues
>> between 0.6 and 0.7.
>>
>> Thanks,
>>
>> Jun
>>
>> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede <neha.narkhede@gmail.com
>> >wrote:
>>
>> > Hi Patricio,
>> >
>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask
>> for
>> > the
>> > > iterator from the same strem, I get a:
>> >
>> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300. If
>> > you do this, it just means that the consumer iterators will shut down
>> > if they don't get another set of messages from Kafka within 300 ms.
>> > Since the iterators shut down, it is illegal to call hasNext()/next()
>> > on the iterators, without recreating them. The way to recreate the
>> > iterators is via the createMessageStreams() API in
>> > ZookeeperConsumerConnector.
>> >
>> > Thanks,
>> > Neha
>> >
>> > 2012/1/5 Patricio Echagüe <patricioe@gmail.com>:
>> > > Hi again. I think I'm running into the Iterator issue mentioned here:
>> > >
>> > >
>> >
>> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E
>> > >
>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask
>> for
>> > the
>> > > iterator from the same strem, I get a:
>> > >
>> > > java.lang.IllegalStateException: Iterator is in failed state
>> > >
>> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47)
>> > >
>> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext(
>> > > KafkaConsumerIterator.java:21)
>> > >
>> > > .....
>> > >
>> > > .....
>> > >
>> > > Note: I'm using the latest Kafka release.
>> > >
>> > > Any suggestion ?
>> > >
>> > > Thanks
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message