kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: High Level Consumer Iterator IllegalStateException Issue
Date Mon, 27 Oct 2014 21:14:26 GMT
Hi Neha,


I have two problems:.  Any help is greatly appreciated.


1)* java.lang.IllegalStateException: Iterator is in failed state*

       ConsumerConnector  consumerConnector = Consumer
                .createJavaConsumerConnector(getConsumerConfig());
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, *32*);
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap =
consumerConnector
                .createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams =
Collections.synchronizedList(topicStreamMap.get(topic));

        AppStaticInfo info = Mupd8Main.STATICINFO();

        Iterator<KafkaStream<byte[], byte[]>> iterator =
streams.iterator();
        // remove the head first list for this source...rest are for the
Dynamic Souce...
        mainIterator = iterator.next().iterator();

        List<ConsumerIterator<byte[], byte[]>> iteratorList = new
ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
        // now rest of the iterator must be registered now..
        while(iterator.hasNext()){
            iteratorList.add(iterator.next().iterator());
        }
        *KafkaStreamRegistory.registerStream(mainSourceName, iteratorList);*

Once the Consumer iterator is created and registered.  We use this in
another thread to start reading from the Consumer Iterator.   Sometime it
give following exception.

24 Oct 2014 16:03:25,923 ERROR
[SourceReader:request_source:LogStreamKafkaSource1]
(grizzled.slf4j.Logger.error:116)  - SourceThread: exception during reads.
Swallowed to continue next read.
java.lang.IllegalStateException: Iterator is in failed state
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)


I have tried to recover from this state by using this:
iterator.resetState(); but it does not recover sometime.




*2) ConsumerFetcherThread are blocked on enqueue ?  What controls size of
queue ? Why are they blocked ?  *Due to this our lags are increasing.  our
threads blocked on hasNext()...

"ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
[0x0000000116379000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000704019388> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
        at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

"ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
[0x0000000116276000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000704064ce0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
        at
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
        at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
        at
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
        at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
        at
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
        at kafka.utils.Utils$.inLock(Utils.scala:535)
        at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
        at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)






Thanks,

Bhavesh



On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkhede@gmail.com>
wrote:

> Can you provide the steps to reproduce this issue?
>
> On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > I am using one from the Kafka Trunk branch.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <neha.narkhede@gmail.com>
> > wrote:
> >
> > > Which version of Kafka are you using on the consumer?
> > >
> > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > HI Kafka Community ,
> > > >
> > > > I am using kafka trunk source code and I get following exception.
> What
> > > > could cause the iterator to have FAILED state.  Please let me know
> how
> > I
> > > > can fix this issue.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *java.lang.IllegalStateException: Iterator is in failed state    at
> > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > Here is Properties:
> > > >
> > > >         Properties props = new Properties();
> > > >         props.put("zookeeper.connect", zkConnect);
> > > >         props.put("group.id", groupId);
> > > > *        props.put("consumer.timeout.ms <http://consumer.timeout.ms
> >",
> > > > "-1");*
> > > >         props.put("zookeeper.session.timeout.ms", "10000");
> > > >         props.put("zookeeper.sync.time.ms", "6000");
> > > >         props.put("auto.commit.interval.ms", "2000");
> > > >         props.put("rebalance.max.retries", "8");
> > > >         props.put("auto.offset.reset", "largest");
> > > >         props.put("fetch.message.max.bytes","2097152");
> > > >         props.put("socket.receive.buffer.bytes","2097152");
> > > >         props.put("auto.commit.enable","true");
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message