kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: SimpleConsumer woes
Date Wed, 04 Jan 2012 17:58:19 GMT
Joe,

If you enable GC logging in the consumer, you can see the GC time. This
will confirm whether GC is causing ZK session timeout. If that's indeed the
issue, then a quick fix could be increasing the ZK sesssion timeout.
However, the right fix is to remove GC by investigating potential memory
leak in the app.

Thanks,

Jun

On Wed, Jan 4, 2012 at 9:35 AM, Joe Stein <cryptcom@gmail.com> wrote:

> On Tue, Jan 3, 2012 at 8:32 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > Joe,
> >
> > It seems that you are using the ZK-based consumer, which uses
> > SimpleConsumer under the cover. The exception in SimpleConsumer seems to
> be
> > caused by a broker failure. The ZK exception you saw is typically caused
> by
> > ZK session expiration. Do you see that?
>
>
> yes
>
>
> > If the ZK session only expires
> > occasionally, it is not a problem as long as rebalance completes in the
> end
> > (you should see "Consumer ... selected partitions"). If there are
> constant
> > ZK session expirations, then rebalance may not complete.
>
>
> yup they are happening a lot, consistent
>
>
> > ZK session
> > expiration is typically caused by GC.
> >
>
> any way to stop or prevent my issue? =8^)
>
>
> >
> > Also, which version of Kafka are you using? Which patch did you apply?
> >
> >
> I am running off of trunk, I applied
> https://issues.apache.org/jira/browse/KAFKA-229 (now on trunk also) just
> so
> i could see the error in multifetch
>
>
> > Thanks,
> >
> > Jun
> >
> > On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <cryptcom@gmail.com> wrote:
> >
> > > So my SimpleConsumer woes seem to continue :( I am thinking of not
> using
> > a
> > > SimpleConsumer implementation anymore and rolling my own unless there
> is
> > > another option or a fix here I can implement?
> > >
> > > So, after applying my patch (to see this error) this is the error I am
> > > getting after some time goes by of consuming my stream
> > >
> > > 2012-01-03 15:04:28,871 INFO
> > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > kafka.consumer.ZookeeperConsumerConnector - Consumer
> > > test-consumer-group_doozer selected partitions : controserverlogs:0-0:
> > > fetched offset = 137564195795: consumed offset = 137564195795
> > > 2012-01-03 15:04:28,871 INFO FetchRunnable-0
> > kafka.consumer.SimpleConsumer
> > > - multifetch reconnect due to
> > > java.nio.channels.ClosedByInterruptException
> > > at
> > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263)
> > > at kafka.utils.Utils$.read(Utils.scala:497)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> > > at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179)
> > > at
> kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119)
> > > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117)
> > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61)
> > >
> > >
> > > its does this frequently and if I leave it running if then eventually
> > does
> > > this (after starting back up and rebalancing)
> > >
> > > 2012-01-03 15:06:18,823 INFO
> > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > kafka.consumer.ZookeeperConsumerConnector - begin rebalancing consumer
> > > test-consumer-group_doozer try #0
> > > 2012-01-03 15:06:18,825 INFO
> > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > kafka.consumer.ZookeeperConsumerConnector - exception during rebalance
> > > org.I0Itec.zkclient.exception.ZkNoNodeException:
> > > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> > > NoNode for
> /consumers/test-consumer-group/ids/test-consumer-group_doozer
> > > at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> > > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430)
> > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426)
> > > at
> > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369)
> > > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> > > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > > KeeperErrorCode = NoNode for
> > > /consumers/test-consumer-group/ids/test-consumer-group_doozer
> > > at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> > > at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> > > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> > > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > >
> > >
> > >
> > > granted this could be all about my implementation it is based largely
> on
> > > the "SimpleConsumerShell" as I was not sure of any other starting point
> > > without a lot of my own heavy lifting and just doing it myself (which
> > seems
> > > to have a lot involved which is fine then something like
> > > https://issues.apache.org/jira/browse/KAFKA-232 will be important too
> > for
> > > me something I looked at today need to dig more)
> > >
> > > thoughts? thanks? what are other folks doing?  is anyone using the
> > > SimpleConsumer successfully?
> > >
> > > --
> > >
> > > /*
> > > Joe Stein
> > > http://www.linkedin.com/in/charmalloc
> > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > */
> > >
> >
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message