kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: High Level Consumer Iterator IllegalStateException Issue
Date Tue, 04 Nov 2014 00:35:28 GMT
Bhavesh,

That example has a lot of code. Could you provide a simpler test that
demonstrates the problem?

Thanks,

Jun

On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> wrote:

> Hi Jun,
>
> Here is code base:
>
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
>
> Please let me know if you can help me determine  the root cause.   Why
> there is illegal state and blocking ?
>
> Thanks,
>
> Bhavesh
>
> On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao <junrao@gmail.com> wrote:
>
> > Do you have a simple test that can reproduce this issue?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > HI Jun,
> > >
> > > Consumer Connector is not closed because I can see the ConsumerFetcher
> > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > This is what I see after recovery.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > Another possibility is that the consumer connector is already closed
> > and
> > > > then you call hasNext() on the iterator.
> > > >
> > > > Thanks,
> > > >
> > > >
> > > > Jun
> > > >
> > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > The hasNext() itself throws this error.  I have to manually reset
> > state
> > > > and
> > > > > sometime it is able to recover and other it is not. Any other clue
> ?
> > > > >
> > > > >         public boolean hasNext() {
> > > > >             LOG.info("called of  hasNext() :");
> > > > >             int retry = 3;
> > > > >             while(retry > 0){
> > > > >                 try{
> > > > >                     // this hasNext is blocking call..
> > > > >                     boolean result = iterator.hasNext();
> > > > >                     return result;
> > > > >                 }catch(IllegalStateException exp){
> > > > >                     iterator.resetState();
> > > > >                     LOG.error("GOT IllegalStateException arg trying
> > to
> > > > > recover....", exp);
> > > > >                     retry--;
> > > > >                 }
> > > > >             }
> > > > >             return false;
> > > > >         }
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <junrao@gmail.com>
wrote:
> > > > >
> > > > > > The IllegalStateException typically happens if you call next()
> > before
> > > > > > hasNext() on the iterator.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Neha,
> > > > > > >
> > > > > > > Thanks for your answer.  Can you please let me know how
I can
> > > resolve
> > > > > the
> > > > > > > Iterator IllegalStateException ?  I would appreciate your
is
> this
> > > is
> > > > > bug
> > > > > > I
> > > > > > > can file one or let me know if this is use case specific
?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > > neha.narkhede@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > queued.max.message.chunks controls the consumer's
fetcher
> > queue.
> > > > > > > >
> > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > HI Neha,
> > > > > > > > >
> > > > > > > > > If I solved the problem number 1 think and number
2 will be
> > > > solved
> > > > > > > (prob
> > > > > > > > > 1 is causing problem number 2(blocked)).  Can
you please
> let
> > me
> > > > > know
> > > > > > > what
> > > > > > > > > controls the queue size for *ConsumerFetcherThread*
thread
> ?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Please see the attached java source code which
will
> reproduce
> > > the
> > > > > > > > > problem.  You may remove the recovery process...
 Please
> > check.
> > > > We
> > > > > > > have
> > > > > > > > to
> > > > > > > > > do some work before we start reading from Kafka
Stream
> > > Interator
> > > > > and
> > > > > > > this
> > > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > > IllegalStateException: Iterator is in failed
state*.
> > > > > > > > >
> > > > > > > > > Please let me know your finding and recommendation.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede
<
> > > > > > > neha.narkhede@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> >> Sometime it give following exception.
> > > > > > > > >>
> > > > > > > > >> It will help to have a more specific test
case that
> > reproduces
> > > > the
> > > > > > > > failed
> > > > > > > > >> iterator state.
> > > > > > > > >>
> > > > > > > > >> Also, the consumer threads block if the fetcher
queue is
> > full.
> > > > The
> > > > > > > queue
> > > > > > > > >> can fill up if your consumer thread dies
or slows down.
> I'd
> > > > > > recommend
> > > > > > > > you
> > > > > > > > >> ensure that all your consumer threads are
alive. You can
> > take
> > > a
> > > > > > thread
> > > > > > > > >> dump
> > > > > > > > >> to verify this.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Neha
> > > > > > > > >>
> > > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh
Mistry <
> > > > > > > > >> mistry.p.bhavesh@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi Neha,
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I have two problems:.  Any help is greatly
appreciated.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > 1)* java.lang.IllegalStateException:
Iterator is in
> failed
> > > > > state*
> > > > > > > > >> >
> > > > > > > > >> >        ConsumerConnector  consumerConnector
= Consumer
> > > > > > > > >> >
> > > > >  .createJavaConsumerConnector(getConsumerConfig());
> > > > > > > > >> >         Map<String, Integer> topicCountMap
= new
> > > > HashMap<String,
> > > > > > > > >> > Integer>();
> > > > > > > > >> >         topicCountMap.put(topic, *32*);
> > > > > > > > >> >         Map<String, List<KafkaStream<byte[],
byte[]>>>
> > > > > > > topicStreamMap
> > > > > > > > =
> > > > > > > > >> > consumerConnector
> > > > > > > > >> >                 .createMessageStreams(topicCountMap);
> > > > > > > > >> >
> > > > > > > > >> >         List<KafkaStream<byte[],
byte[]>> streams =
> > > > > > > > >> > Collections.synchronizedList(topicStreamMap.get(topic));
> > > > > > > > >> >
> > > > > > > > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > > > > > > > >> >
> > > > > > > > >> >         Iterator<KafkaStream<byte[],
byte[]>> iterator =
> > > > > > > > >> > streams.iterator();
> > > > > > > > >> >         // remove the head first list
for this
> > source...rest
> > > > are
> > > > > > for
> > > > > > > > the
> > > > > > > > >> > Dynamic Souce...
> > > > > > > > >> >         mainIterator = iterator.next().iterator();
> > > > > > > > >> >
> > > > > > > > >> >         List<ConsumerIterator<byte[],
byte[]>>
> > iteratorList
> > > =
> > > > > new
> > > > > > > > >> >
> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > > > > > > > >> >         // now rest of the iterator
must be registered
> > now..
> > > > > > > > >> >         while(iterator.hasNext()){
> > > > > > > > >> >
>  iteratorList.add(iterator.next().iterator());
> > > > > > > > >> >         }
> > > > > > > > >> >
> >  *KafkaStreamRegistory.registerStream(mainSourceName,
> > > > > > > > >> > iteratorList);*
> > > > > > > > >> >
> > > > > > > > >> > Once the Consumer iterator is created
and registered.
> We
> > > use
> > > > > this
> > > > > > > in
> > > > > > > > >> > another thread to start reading from
the Consumer
> > Iterator.
> > > > > > >  Sometime
> > > > > > > > >> it
> > > > > > > > >> > give following exception.
> > > > > > > > >> >
> > > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR
> > > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > > > > > > > >> > (grizzled.slf4j.Logger.error:116)  -
SourceThread:
> > exception
> > > > > > during
> > > > > > > > >> reads.
> > > > > > > > >> > Swallowed to continue next read.
> > > > > > > > >> > java.lang.IllegalStateException: Iterator
is in failed
> > state
> > > > > > > > >> >     at
> > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I have tried to recover from this state
by using this:
> > > > > > > > >> > iterator.resetState(); but it does not
recover sometime.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > *2) ConsumerFetcherThread are blocked
on enqueue ?  What
> > > > > controls
> > > > > > > size
> > > > > > > > >> of
> > > > > > > > >> > queue ? Why are they blocked ?  *Due
to this our lags
> are
> > > > > > > increasing.
> > > > > > > > >> our
> > > > > > > > >> > threads blocked on hasNext()...
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03
waiting on
> > > condition
> > > > > > > > >> > [0x0000000116379000]
> > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > >> >         at sun.misc.Unsafe.park(Native
Method)
> > > > > > > > >> >         - parking to wait for  <0x0000000704019388>
(a
> > > > > > > > >> >
> > > > > > >
> > > >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > >> >         at
> > > > > > > > >>
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903
waiting on
> > > condition
> > > > > > > > >> > [0x0000000116276000]
> > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > >> >         at sun.misc.Unsafe.park(Native
Method)
> > > > > > > > >> >         - parking to wait for  <0x0000000704064ce0>
(a
> > > > > > > > >> >
> > > > > > >
> > > >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > >> >         at
> > > > > > > > >>
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Bhavesh
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha
Narkhede <
> > > > > > > > neha.narkhede@gmail.com
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Can you provide the steps to reproduce
this issue?
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11 PM,
Bhavesh Mistry <
> > > > > > > > >> > > mistry.p.bhavesh@gmail.com>
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > I am using one from the Kafka
Trunk branch.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Bhavesh
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Fri, Oct 24, 2014 at 5:24
PM, Neha Narkhede <
> > > > > > > > >> > neha.narkhede@gmail.com>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Which version of Kafka
are you using on the
> > consumer?
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Fri, Oct 24, 2014
at 4:14 PM, Bhavesh Mistry <
> > > > > > > > >> > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > >> > > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > HI Kafka Community
,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > I am using kafka
trunk source code and I get
> > > following
> > > > > > > > >> exception.
> > > > > > > > >> > > What
> > > > > > > > >> > > > > > could cause the
iterator to have FAILED state.
> > > Please
> > > > > let
> > > > > > > me
> > > > > > > > >> know
> > > > > > > > >> > > how
> > > > > > > > >> > > > I
> > > > > > > > >> > > > > > can fix this issue.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > *java.lang.IllegalStateException:
Iterator is in
> > > > failed
> > > > > > > state
> > > > > > > > >>   at
> > > > > > > > >> > > > > >
> > > > > > > >
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > > > >> > > > > > Here is Properties:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >         Properties
props = new Properties();
> > > > > > > > >> > > > > >         props.put("zookeeper.connect",
> zkConnect);
> > > > > > > > >> > > > > >         props.put("group.id",
groupId);
> > > > > > > > >> > > > > > *        props.put("consumer.timeout.ms
<
> > > > > > > > >> > http://consumer.timeout.ms
> > > > > > > > >> > > >",
> > > > > > > > >> > > > > > "-1");*
> > > > > > > > >> > > > > >         props.put("zookeeper.session.timeout.ms
> ",
> > > > > > "10000");
> > > > > > > > >> > > > > >         props.put("zookeeper.sync.time.ms",
> > > "6000");
> > > > > > > > >> > > > > >         props.put("auto.commit.interval.ms",
> > > "2000");
> > > > > > > > >> > > > > >         props.put("rebalance.max.retries",
"8");
> > > > > > > > >> > > > > >         props.put("auto.offset.reset",
> "largest");
> > > > > > > > >> > > > > >
> > > >  props.put("fetch.message.max.bytes","2097152");
> > > > > > > > >> > > > > >
> > > > > >  props.put("socket.receive.buffer.bytes","2097152");
> > > > > > > > >> > > > > >         props.put("auto.commit.enable","true");
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Bhavesh
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message