kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: High Level Consumer Iterator IllegalStateException Issue
Date Tue, 04 Nov 2014 17:16:24 GMT
Hi Neha and Jun,

I have fixed the issue on my side based on what Jun had mentioned "next()
gives IllegalStateException if hasNext is not called..."  Based on this I
did further debug, I was my mistake sharing same consumer iterator across
multiple threads so (I forgot to call iterator.remove() in my registry so
it was using first Consumer iterator across all threads).  So due to
multiple threads sharing same (first one) consumer iterator therefore it
was intermittently getting this exception.

This issue is resolve thank you very much for your support.

Thanks,

Bhavesh

On Mon, Nov 3, 2014 at 4:35 PM, Jun Rao <junrao@gmail.com> wrote:

> Bhavesh,
>
> That example has a lot of code. Could you provide a simpler test that
> demonstrates the problem?
>
> Thanks,
>
> Jun
>
> On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Here is code base:
> >
> >
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
> >
> > Please let me know if you can help me determine  the root cause.   Why
> > there is illegal state and blocking ?
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao <junrao@gmail.com> wrote:
> >
> > > Do you have a simple test that can reproduce this issue?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > HI Jun,
> > > >
> > > > Consumer Connector is not closed because I can see the
> ConsumerFetcher
> > > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > > This is what I see after recovery.
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <junrao@gmail.com> wrote:
> > > >
> > > > > Another possibility is that the consumer connector is already
> closed
> > > and
> > > > > then you call hasNext() on the iterator.
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The hasNext() itself throws this error.  I have to manually
reset
> > > state
> > > > > and
> > > > > > sometime it is able to recover and other it is not. Any other
> clue
> > ?
> > > > > >
> > > > > >         public boolean hasNext() {
> > > > > >             LOG.info("called of  hasNext() :");
> > > > > >             int retry = 3;
> > > > > >             while(retry > 0){
> > > > > >                 try{
> > > > > >                     // this hasNext is blocking call..
> > > > > >                     boolean result = iterator.hasNext();
> > > > > >                     return result;
> > > > > >                 }catch(IllegalStateException exp){
> > > > > >                     iterator.resetState();
> > > > > >                     LOG.error("GOT IllegalStateException arg
> trying
> > > to
> > > > > > recover....", exp);
> > > > > >                     retry--;
> > > > > >                 }
> > > > > >             }
> > > > > >             return false;
> > > > > >         }
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <junrao@gmail.com>
> wrote:
> > > > > >
> > > > > > > The IllegalStateException typically happens if you call
next()
> > > before
> > > > > > > hasNext() on the iterator.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Neha,
> > > > > > > >
> > > > > > > > Thanks for your answer.  Can you please let me know
how I can
> > > > resolve
> > > > > > the
> > > > > > > > Iterator IllegalStateException ?  I would appreciate
your is
> > this
> > > > is
> > > > > > bug
> > > > > > > I
> > > > > > > > can file one or let me know if this is use case specific
?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Bhavesh
> > > > > > > >
> > > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > > > neha.narkhede@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > queued.max.message.chunks controls the consumer's
fetcher
> > > queue.
> > > > > > > > >
> > > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry
<
> > > > > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > HI Neha,
> > > > > > > > > >
> > > > > > > > > > If I solved the problem number 1 think and
number 2 will
> be
> > > > > solved
> > > > > > > > (prob
> > > > > > > > > > 1 is causing problem number 2(blocked)).
 Can you please
> > let
> > > me
> > > > > > know
> > > > > > > > what
> > > > > > > > > > controls the queue size for *ConsumerFetcherThread*
> thread
> > ?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Please see the attached java source code
which will
> > reproduce
> > > > the
> > > > > > > > > > problem.  You may remove the recovery process...
 Please
> > > check.
> > > > > We
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > do some work before we start reading from
Kafka Stream
> > > > Interator
> > > > > > and
> > > > > > > > this
> > > > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > > > IllegalStateException: Iterator is in failed
state*.
> > > > > > > > > >
> > > > > > > > > > Please let me know your finding and recommendation.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Bhavesh
> > > > > > > > > >
> > > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede
<
> > > > > > > > neha.narkhede@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> >> Sometime it give following
exception.
> > > > > > > > > >>
> > > > > > > > > >> It will help to have a more specific
test case that
> > > reproduces
> > > > > the
> > > > > > > > > failed
> > > > > > > > > >> iterator state.
> > > > > > > > > >>
> > > > > > > > > >> Also, the consumer threads block if
the fetcher queue is
> > > full.
> > > > > The
> > > > > > > > queue
> > > > > > > > > >> can fill up if your consumer thread
dies or slows down.
> > I'd
> > > > > > > recommend
> > > > > > > > > you
> > > > > > > > > >> ensure that all your consumer threads
are alive. You can
> > > take
> > > > a
> > > > > > > thread
> > > > > > > > > >> dump
> > > > > > > > > >> to verify this.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> Neha
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh
Mistry <
> > > > > > > > > >> mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hi Neha,
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I have two problems:.  Any help
is greatly
> appreciated.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > 1)* java.lang.IllegalStateException:
Iterator is in
> > failed
> > > > > > state*
> > > > > > > > > >> >
> > > > > > > > > >> >        ConsumerConnector  consumerConnector
= Consumer
> > > > > > > > > >> >
> > > > > >  .createJavaConsumerConnector(getConsumerConfig());
> > > > > > > > > >> >         Map<String, Integer>
topicCountMap = new
> > > > > HashMap<String,
> > > > > > > > > >> > Integer>();
> > > > > > > > > >> >         topicCountMap.put(topic,
*32*);
> > > > > > > > > >> >         Map<String, List<KafkaStream<byte[],
byte[]>>>
> > > > > > > > topicStreamMap
> > > > > > > > > =
> > > > > > > > > >> > consumerConnector
> > > > > > > > > >> >                 .createMessageStreams(topicCountMap);
> > > > > > > > > >> >
> > > > > > > > > >> >         List<KafkaStream<byte[],
byte[]>> streams =
> > > > > > > > > >> >
> Collections.synchronizedList(topicStreamMap.get(topic));
> > > > > > > > > >> >
> > > > > > > > > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > > > > > > > > >> >
> > > > > > > > > >> >         Iterator<KafkaStream<byte[],
byte[]>>
> iterator =
> > > > > > > > > >> > streams.iterator();
> > > > > > > > > >> >         // remove the head first
list for this
> > > source...rest
> > > > > are
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > >> > Dynamic Souce...
> > > > > > > > > >> >         mainIterator = iterator.next().iterator();
> > > > > > > > > >> >
> > > > > > > > > >> >         List<ConsumerIterator<byte[],
byte[]>>
> > > iteratorList
> > > > =
> > > > > > new
> > > > > > > > > >> >
> > > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > > > > > > > > >> >         // now rest of the iterator
must be registered
> > > now..
> > > > > > > > > >> >         while(iterator.hasNext()){
> > > > > > > > > >> >
> >  iteratorList.add(iterator.next().iterator());
> > > > > > > > > >> >         }
> > > > > > > > > >> >
> > >  *KafkaStreamRegistory.registerStream(mainSourceName,
> > > > > > > > > >> > iteratorList);*
> > > > > > > > > >> >
> > > > > > > > > >> > Once the Consumer iterator is created
and registered.
> > We
> > > > use
> > > > > > this
> > > > > > > > in
> > > > > > > > > >> > another thread to start reading
from the Consumer
> > > Iterator.
> > > > > > > >  Sometime
> > > > > > > > > >> it
> > > > > > > > > >> > give following exception.
> > > > > > > > > >> >
> > > > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR
> > > > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > > > > > > > > >> > (grizzled.slf4j.Logger.error:116)
 - SourceThread:
> > > exception
> > > > > > > during
> > > > > > > > > >> reads.
> > > > > > > > > >> > Swallowed to continue next read.
> > > > > > > > > >> > java.lang.IllegalStateException:
Iterator is in failed
> > > state
> > > > > > > > > >> >     at
> > > > > > > >
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I have tried to recover from this
state by using this:
> > > > > > > > > >> > iterator.resetState(); but it does
not recover
> sometime.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > *2) ConsumerFetcherThread are blocked
on enqueue ?
> What
> > > > > > controls
> > > > > > > > size
> > > > > > > > > >> of
> > > > > > > > > >> > queue ? Why are they blocked ?
 *Due to this our lags
> > are
> > > > > > > > increasing.
> > > > > > > > > >> our
> > > > > > > > > >> > threads blocked on hasNext()...
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > > > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03
waiting on
> > > > condition
> > > > > > > > > >> > [0x0000000116379000]
> > > > > > > > > >> >    java.lang.Thread.State: WAITING
(parking)
> > > > > > > > > >> >         at sun.misc.Unsafe.park(Native
Method)
> > > > > > > > > >> >         - parking to wait for 
<0x0000000704019388> (a
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > > >> >         at
> > > > > > > > > >>
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > > > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903
waiting on
> > > > condition
> > > > > > > > > >> > [0x0000000116276000]
> > > > > > > > > >> >    java.lang.Thread.State: WAITING
(parking)
> > > > > > > > > >> >         at sun.misc.Unsafe.park(Native
Method)
> > > > > > > > > >> >         - parking to wait for 
<0x0000000704064ce0> (a
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > > >> >         at
> > > > > > > > > >>
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> >
> > > > > > > > > >> > Bhavesh
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM,
Neha Narkhede <
> > > > > > > > > neha.narkhede@gmail.com
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Can you provide the steps
to reproduce this issue?
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11
PM, Bhavesh Mistry <
> > > > > > > > > >> > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > I am using one from the
Kafka Trunk branch.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Thanks,
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Bhavesh
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Fri, Oct 24, 2014
at 5:24 PM, Neha Narkhede <
> > > > > > > > > >> > neha.narkhede@gmail.com>
> > > > > > > > > >> > > > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Which version of
Kafka are you using on the
> > > consumer?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Fri, Oct 24,
2014 at 4:14 PM, Bhavesh Mistry
> <
> > > > > > > > > >> > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> > > > > wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > HI Kafka Community
,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > I am using
kafka trunk source code and I get
> > > > following
> > > > > > > > > >> exception.
> > > > > > > > > >> > > What
> > > > > > > > > >> > > > > > could cause
the iterator to have FAILED state.
> > > > Please
> > > > > > let
> > > > > > > > me
> > > > > > > > > >> know
> > > > > > > > > >> > > how
> > > > > > > > > >> > > > I
> > > > > > > > > >> > > > > > can fix this
issue.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > *java.lang.IllegalStateException:
Iterator is
> in
> > > > > failed
> > > > > > > > state
> > > > > > > > > >>   at
> > > > > > > > > >> > > > > >
> > > > > > > > >
> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > > > > >> > > > > > Here is Properties:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >         Properties
props = new Properties();
> > > > > > > > > >> > > > > >         props.put("zookeeper.connect",
> > zkConnect);
> > > > > > > > > >> > > > > >         props.put("group.id",
groupId);
> > > > > > > > > >> > > > > > *        props.put("consumer.timeout.ms
<
> > > > > > > > > >> > http://consumer.timeout.ms
> > > > > > > > > >> > > >",
> > > > > > > > > >> > > > > > "-1");*
> > > > > > > > > >> > > > > >         props.put("
> zookeeper.session.timeout.ms
> > ",
> > > > > > > "10000");
> > > > > > > > > >> > > > > >         props.put("zookeeper.sync.time.ms",
> > > > "6000");
> > > > > > > > > >> > > > > >         props.put("auto.commit.interval.ms",
> > > > "2000");
> > > > > > > > > >> > > > > >         props.put("rebalance.max.retries",
> "8");
> > > > > > > > > >> > > > > >         props.put("auto.offset.reset",
> > "largest");
> > > > > > > > > >> > > > > >
> > > > >  props.put("fetch.message.max.bytes","2097152");
> > > > > > > > > >> > > > > >
> > > > > > >  props.put("socket.receive.buffer.bytes","2097152");
> > > > > > > > > >> > > > > >
>  props.put("auto.commit.enable","true");
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Bhavesh
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message