kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Stein <crypt...@gmail.com>
Subject Re: SimpleConsumer woes
Date Wed, 04 Jan 2012 18:13:35 GMT
thanks, let me hack on that a bit

On Wed, Jan 4, 2012 at 12:58 PM, Jun Rao <junrao@gmail.com> wrote:

> Joe,
>
> If you enable GC logging in the consumer, you can see the GC time. This
> will confirm whether GC is causing ZK session timeout. If that's indeed the
> issue, then a quick fix could be increasing the ZK sesssion timeout.
> However, the right fix is to remove GC by investigating potential memory
> leak in the app.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 4, 2012 at 9:35 AM, Joe Stein <cryptcom@gmail.com> wrote:
>
> > On Tue, Jan 3, 2012 at 8:32 PM, Jun Rao <junrao@gmail.com> wrote:
> >
> > > Joe,
> > >
> > > It seems that you are using the ZK-based consumer, which uses
> > > SimpleConsumer under the cover. The exception in SimpleConsumer seems
> to
> > be
> > > caused by a broker failure. The ZK exception you saw is typically
> caused
> > by
> > > ZK session expiration. Do you see that?
> >
> >
> > yes
> >
> >
> > > If the ZK session only expires
> > > occasionally, it is not a problem as long as rebalance completes in the
> > end
> > > (you should see "Consumer ... selected partitions"). If there are
> > constant
> > > ZK session expirations, then rebalance may not complete.
> >
> >
> > yup they are happening a lot, consistent
> >
> >
> > > ZK session
> > > expiration is typically caused by GC.
> > >
> >
> > any way to stop or prevent my issue? =8^)
> >
> >
> > >
> > > Also, which version of Kafka are you using? Which patch did you apply?
> > >
> > >
> > I am running off of trunk, I applied
> > https://issues.apache.org/jira/browse/KAFKA-229 (now on trunk also) just
> > so
> > i could see the error in multifetch
> >
> >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <cryptcom@gmail.com> wrote:
> > >
> > > > So my SimpleConsumer woes seem to continue :( I am thinking of not
> > using
> > > a
> > > > SimpleConsumer implementation anymore and rolling my own unless there
> > is
> > > > another option or a fix here I can implement?
> > > >
> > > > So, after applying my patch (to see this error) this is the error I
> am
> > > > getting after some time goes by of consuming my stream
> > > >
> > > > 2012-01-03 15:04:28,871 INFO
> > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > > kafka.consumer.ZookeeperConsumerConnector - Consumer
> > > > test-consumer-group_doozer selected partitions :
> controserverlogs:0-0:
> > > > fetched offset = 137564195795: consumed offset = 137564195795
> > > > 2012-01-03 15:04:28,871 INFO FetchRunnable-0
> > > kafka.consumer.SimpleConsumer
> > > > - multifetch reconnect due to
> > > > java.nio.channels.ClosedByInterruptException
> > > > at
> > > >
> > > >
> > >
> >
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> > > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263)
> > > > at kafka.utils.Utils$.read(Utils.scala:497)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> > > > at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at
> kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179)
> > > > at
> > kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119)
> > > > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117)
> > > > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61)
> > > >
> > > >
> > > > its does this frequently and if I leave it running if then eventually
> > > does
> > > > this (after starting back up and rebalancing)
> > > >
> > > > 2012-01-03 15:06:18,823 INFO
> > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > > kafka.consumer.ZookeeperConsumerConnector - begin rebalancing
> consumer
> > > > test-consumer-group_doozer try #0
> > > > 2012-01-03 15:06:18,825 INFO
> > > > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> rebalance
> > > > org.I0Itec.zkclient.exception.ZkNoNodeException:
> > > > org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode =
> > > > NoNode for
> > /consumers/test-consumer-group/ids/test-consumer-group_doozer
> > > > at
> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > > > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> > > > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> > > > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430)
> > > > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369)
> > > > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> > > > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > > > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > > > KeeperErrorCode = NoNode for
> > > > /consumers/test-consumer-group/ids/test-consumer-group_doozer
> > > > at
> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> > > > at
> org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> > > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> > > > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> > > > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> > > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> > > > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> > > > at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > >
> > > >
> > > >
> > > > granted this could be all about my implementation it is based largely
> > on
> > > > the "SimpleConsumerShell" as I was not sure of any other starting
> point
> > > > without a lot of my own heavy lifting and just doing it myself (which
> > > seems
> > > > to have a lot involved which is fine then something like
> > > > https://issues.apache.org/jira/browse/KAFKA-232 will be important
> too
> > > for
> > > > me something I looked at today need to dig more)
> > > >
> > > > thoughts? thanks? what are other folks doing?  is anyone using the
> > > > SimpleConsumer successfully?
> > > >
> > > > --
> > > >
> > > > /*
> > > > Joe Stein
> > > > http://www.linkedin.com/in/charmalloc
> > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > */
> > > >
> > >
> >
> >
> >
> > --
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message