kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Consumer iterator error
Date Fri, 06 Jan 2012 00:31:30 GMT
Jun,

I agree that the iterator should be re-iterable after a timeout, but I
was referring to its current behavior, which does not allow
re-iteration after a timeout (look at maybeComputeNext())
Today, the only way to re-iterate after a timeout is to re-create the
KafkaMessageStream, which essentially means we don't allow
re-iteration from a timed-out iterator.

Patricio,

>> So answering Neha's question, I do create the iterator again.

The bug is that under the covers, we don't create a new iterator, we
reuse the iterator which entered a failed state after the timeout.
Hence, you see the IllegalStateException.
Thanks for writing up that test. Do you mind filing a bug and
attaching your test there ?

-Neha

2012/1/5 Patricio Echagüe <patricioe@gmail.com>:
> I wrote this small test case: http://pastebin.com/Lv1wgK36
>
> First time there is a iterator timeout, my code catches it (line 70). The
> second iteration of the while loop falls into the IllegalStateException
> after requesting a new iterator from the same stream.
>
> 2012/1/5 Patricio Echagüe <patricioe@gmail.com>
>
>> Jun and Neha, Thanks again for the quick response.
>>
>> I use Kafka 0.7
>>
>> What I'm trying to achieve is to not block on the iterator. So I set up a
>> consumer.timeout.ms=300. The pseudo code is something like this:
>>
>> private void myMethod() {
>>   try {
>>
>>       for(Message message: stream) {
>>
>>             System.out.println("Message received: " + message.toString());
>>
>>         }
>>
>>   } catch (ConsumerTimeoutException e) {
>>
>>    // do something
>>
>>   }
>>
>> }
>>
>>
>> when a timeout occurs, next time this method is called it does
>> stream.iterator() (see the "for" loop in my snippet). So answering Neha's
>> question, I do create the iterator again. What I don't re-create is the
>> MessageStream. Do I need to re-create the stream as well ?
>>
>>
>> Just requesting a new iterator after a timeout doesn't seem to help.
>>
>> On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <junrao@gmail.com> wrote:
>>
>>> Actually, the consumer stream is supposed to be re-iterable after the
>>> timeout. But make sure that you check hasNext before calling next when you
>>> resume the consumption.
>>>
>>> Also, could you try this on the 0.7 release? We fixed a bunch of issues
>>> between 0.6 and 0.7.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede <neha.narkhede@gmail.com
>>> >wrote:
>>>
>>> > Hi Patricio,
>>> >
>>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask
>>> for
>>> > the
>>> > > iterator from the same strem, I get a:
>>> >
>>> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300. If
>>> > you do this, it just means that the consumer iterators will shut down
>>> > if they don't get another set of messages from Kafka within 300 ms.
>>> > Since the iterators shut down, it is illegal to call hasNext()/next()
>>> > on the iterators, without recreating them. The way to recreate the
>>> > iterators is via the createMessageStreams() API in
>>> > ZookeeperConsumerConnector.
>>> >
>>> > Thanks,
>>> > Neha
>>> >
>>> > 2012/1/5 Patricio Echagüe <patricioe@gmail.com>:
>>> > > Hi again. I think I'm running into the Iterator issue mentioned here:
>>> > >
>>> > >
>>> >
>>> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E
>>> > >
>>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask
>>> for
>>> > the
>>> > > iterator from the same strem, I get a:
>>> > >
>>> > > java.lang.IllegalStateException: Iterator is in failed state
>>> > >
>>> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47)
>>> > >
>>> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext(
>>> > > KafkaConsumerIterator.java:21)
>>> > >
>>> > > .....
>>> > >
>>> > > .....
>>> > >
>>> > > Note: I'm using the latest Kafka release.
>>> > >
>>> > > Any suggestion ?
>>> > >
>>> > > Thanks
>>> >
>>>
>>
>>

Mime
View raw message