kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: SimpleConsumer woes
Date Wed, 04 Jan 2012 17:42:01 GMT
Joe,

>> ZK session
>> expiration is typically caused by GC.
>>

> any way to stop or prevent my issue? =8^)

Yes. To confirm if it is your client that is GCing, can you please
inspect and confirm this through jconsole or other such tools ?
It can either be the zookeeper servers underperforming, or your client
going through frequent GCs.

Thanks,
Neha


On Wed, Jan 4, 2012 at 9:35 AM, Joe Stein <cryptcom@gmail.com> wrote:
> On Tue, Jan 3, 2012 at 8:32 PM, Jun Rao <junrao@gmail.com> wrote:
>
>> Joe,
>>
>> It seems that you are using the ZK-based consumer, which uses
>> SimpleConsumer under the cover. The exception in SimpleConsumer seems to be
>> caused by a broker failure. The ZK exception you saw is typically caused by
>> ZK session expiration. Do you see that?
>
>
> yes
>
>
>> If the ZK session only expires
>> occasionally, it is not a problem as long as rebalance completes in the end
>> (you should see "Consumer ... selected partitions"). If there are constant
>> ZK session expirations, then rebalance may not complete.
>
>
> yup they are happening a lot, consistent
>
>
>> ZK session
>> expiration is typically caused by GC.
>>
>
> any way to stop or prevent my issue? =8^)
>
>
>>
>> Also, which version of Kafka are you using? Which patch did you apply?
>>
>>
> I am running off of trunk, I applied
> https://issues.apache.org/jira/browse/KAFKA-229 (now on trunk also) just so
> i could see the error in multifetch
>
>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <cryptcom@gmail.com> wrote:
>>
>> > So my SimpleConsumer woes seem to continue :( I am thinking of not using
>> a
>> > SimpleConsumer implementation anymore and rolling my own unless there is
>> > another option or a fix here I can implement?
>> >
>> > So, after applying my patch (to see this error) this is the error I am
>> > getting after some time goes by of consuming my stream
>> >
>> > 2012-01-03 15:04:28,871 INFO
>> > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
>> > kafka.consumer.ZookeeperConsumerConnector - Consumer
>> > test-consumer-group_doozer selected partitions : controserverlogs:0-0:
>> > fetched offset = 137564195795: consumed offset = 137564195795
>> > 2012-01-03 15:04:28,871 INFO FetchRunnable-0
>> kafka.consumer.SimpleConsumer
>> > - multifetch reconnect due to
>> > java.nio.channels.ClosedByInterruptException
>> > at
>> >
>> >
>> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
>> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263)
>> > at kafka.utils.Utils$.read(Utils.scala:497)
>> > at
>> >
>> >
>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
>> > at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
>> > at
>> >
>> >
>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>> > at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179)
>> > at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119)
>> > at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117)
>> > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61)
>> >
>> >
>> > its does this frequently and if I leave it running if then eventually
>> does
>> > this (after starting back up and rebalancing)
>> >
>> > 2012-01-03 15:06:18,823 INFO
>> > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
>> > kafka.consumer.ZookeeperConsumerConnector - begin rebalancing consumer
>> > test-consumer-group_doozer try #0
>> > 2012-01-03 15:06:18,825 INFO
>> > ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
>> > kafka.consumer.ZookeeperConsumerConnector - exception during rebalance
>> > org.I0Itec.zkclient.exception.ZkNoNodeException:
>> > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
>> > NoNode for /consumers/test-consumer-group/ids/test-consumer-group_doozer
>> > at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
>> > at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
>> > at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430)
>> > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426)
>> > at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369)
>> > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
>> > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
>> > KeeperErrorCode = NoNode for
>> > /consumers/test-consumer-group/ids/test-consumer-group_doozer
>> > at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>> > at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>> > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>> > at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>> > at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>> > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>> > at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>> > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >
>> >
>> >
>> > granted this could be all about my implementation it is based largely on
>> > the "SimpleConsumerShell" as I was not sure of any other starting point
>> > without a lot of my own heavy lifting and just doing it myself (which
>> seems
>> > to have a lot involved which is fine then something like
>> > https://issues.apache.org/jira/browse/KAFKA-232 will be important too
>> for
>> > me something I looked at today need to dig more)
>> >
>> > thoughts? thanks? what are other folks doing?  is anyone using the
>> > SimpleConsumer successfully?
>> >
>> > --
>> >
>> > /*
>> > Joe Stein
>> > http://www.linkedin.com/in/charmalloc
>> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> > */
>> >
>>
>
>
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */

Mime
View raw message