kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echagüe <patric...@gmail.com>
Subject Re: Consumer iterator error
Date Fri, 06 Jan 2012 00:44:12 GMT
No problem. I'll do that.

Thank you guys.

On Thu, Jan 5, 2012 at 4:31 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> Jun,
>
> I agree that the iterator should be re-iterable after a timeout, but I
> was referring to its current behavior, which does not allow
> re-iteration after a timeout (look at maybeComputeNext())
> Today, the only way to re-iterate after a timeout is to re-create the
> KafkaMessageStream, which essentially means we don't allow
> re-iteration from a timed-out iterator.
>
> Patricio,
>
> >> So answering Neha's question, I do create the iterator again.
>
> The bug is that under the covers, we don't create a new iterator, we
> reuse the iterator which entered a failed state after the timeout.
> Hence, you see the IllegalStateException.
> Thanks for writing up that test. Do you mind filing a bug and
> attaching your test there ?
>
> -Neha
>
> 2012/1/5 Patricio Echagüe <patricioe@gmail.com>:
> > I wrote this small test case: http://pastebin.com/Lv1wgK36
> >
> > First time there is a iterator timeout, my code catches it (line 70). The
> > second iteration of the while loop falls into the IllegalStateException
> > after requesting a new iterator from the same stream.
> >
> > 2012/1/5 Patricio Echagüe <patricioe@gmail.com>
> >
> >> Jun and Neha, Thanks again for the quick response.
> >>
> >> I use Kafka 0.7
> >>
> >> What I'm trying to achieve is to not block on the iterator. So I set up
> a
> >> consumer.timeout.ms=300. The pseudo code is something like this:
> >>
> >> private void myMethod() {
> >>   try {
> >>
> >>       for(Message message: stream) {
> >>
> >>             System.out.println("Message received: " +
> message.toString());
> >>
> >>         }
> >>
> >>   } catch (ConsumerTimeoutException e) {
> >>
> >>    // do something
> >>
> >>   }
> >>
> >> }
> >>
> >>
> >> when a timeout occurs, next time this method is called it does
> >> stream.iterator() (see the "for" loop in my snippet). So answering
> Neha's
> >> question, I do create the iterator again. What I don't re-create is the
> >> MessageStream. Do I need to re-create the stream as well ?
> >>
> >>
> >> Just requesting a new iterator after a timeout doesn't seem to help.
> >>
> >> On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <junrao@gmail.com> wrote:
> >>
> >>> Actually, the consumer stream is supposed to be re-iterable after the
> >>> timeout. But make sure that you check hasNext before calling next when
> you
> >>> resume the consumption.
> >>>
> >>> Also, could you try this on the 0.7 release? We fixed a bunch of issues
> >>> between 0.6 and 0.7.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede <
> neha.narkhede@gmail.com
> >>> >wrote:
> >>>
> >>> > Hi Patricio,
> >>> >
> >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time
I ask
> >>> for
> >>> > the
> >>> > > iterator from the same strem, I get a:
> >>> >
> >>> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300.
> If
> >>> > you do this, it just means that the consumer iterators will shut down
> >>> > if they don't get another set of messages from Kafka within 300 ms.
> >>> > Since the iterators shut down, it is illegal to call hasNext()/next()
> >>> > on the iterators, without recreating them. The way to recreate the
> >>> > iterators is via the createMessageStreams() API in
> >>> > ZookeeperConsumerConnector.
> >>> >
> >>> > Thanks,
> >>> > Neha
> >>> >
> >>> > 2012/1/5 Patricio Echagüe <patricioe@gmail.com>:
> >>> > > Hi again. I think I'm running into the Iterator issue mentioned
> here:
> >>> > >
> >>> > >
> >>> >
> >>>
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E
> >>> > >
> >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time
I ask
> >>> for
> >>> > the
> >>> > > iterator from the same strem, I get a:
> >>> > >
> >>> > > java.lang.IllegalStateException: Iterator is in failed state
> >>> > >
> >>> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47)
> >>> > >
> >>> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext(
> >>> > > KafkaConsumerIterator.java:21)
> >>> > >
> >>> > > .....
> >>> > >
> >>> > > .....
> >>> > >
> >>> > > Note: I'm using the latest Kafka release.
> >>> > >
> >>> > > Any suggestion ?
> >>> > >
> >>> > > Thanks
> >>> >
> >>>
> >>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message