kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: SimpleConsumer woes
Date Wed, 04 Jan 2012 01:32:41 GMT
Joe,

It seems that you are using the ZK-based consumer, which uses
SimpleConsumer under the cover. The exception in SimpleConsumer seems to be
caused by a broker failure. The ZK exception you saw is typically caused by
ZK session expiration. Do you see that? If the ZK session only expires
occasionally, it is not a problem as long as rebalance completes in the end
(you should see "Consumer ... selected partitions"). If there are constant
ZK session expirations, then rebalance may not complete. ZK session
expiration is typically caused by GC.

Also, which version of Kafka are you using? Which patch did you apply?

Thanks,

Jun

On Tue, Jan 3, 2012 at 2:25 PM, Joe Stein <cryptcom@gmail.com> wrote:

> So my SimpleConsumer woes seem to continue :( I am thinking of not using a
> SimpleConsumer implementation anymore and rolling my own unless there is
> another option or a fix here I can implement?
>
> So, after applying my patch (to see this error) this is the error I am
> getting after some time goes by of consuming my stream
>
> 2012-01-03 15:04:28,871 INFO
> ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> kafka.consumer.ZookeeperConsumerConnector - Consumer
> test-consumer-group_doozer selected partitions : controserverlogs:0-0:
> fetched offset = 137564195795: consumed offset = 137564195795
> 2012-01-03 15:04:28,871 INFO FetchRunnable-0 kafka.consumer.SimpleConsumer
> - multifetch reconnect due to
> java.nio.channels.ClosedByInterruptException
> at
>
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:263)
> at kafka.utils.Utils$.read(Utils.scala:497)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:55)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:179)
> at kafka.consumer.SimpleConsumer.liftedTree2$1(SimpleConsumer.scala:119)
> at kafka.consumer.SimpleConsumer.multifetch(SimpleConsumer.scala:117)
> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:61)
>
>
> its does this frequently and if I leave it running if then eventually does
> this (after starting back up and rebalancing)
>
> 2012-01-03 15:06:18,823 INFO
> ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> kafka.consumer.ZookeeperConsumerConnector - begin rebalancing consumer
> test-consumer-group_doozer try #0
> 2012-01-03 15:06:18,825 INFO
> ZkClient-EventThread-763-zoo1:2181,zoo2:2181,zoo3:2181
> kafka.consumer.ZookeeperConsumerConnector - exception during rebalance
> org.I0Itec.zkclient.exception.ZkNoNodeException:
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /consumers/test-consumer-group/ids/test-consumer-group_doozer
> at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
> at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
> at kafka.utils.ZkUtils$.readData(ZkUtils.scala:161)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$getTopicCount(ZookeeperConsumerConnector.scala:414)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:453)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:430)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:75)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:426)
> at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.handleChildChange(ZookeeperConsumerConnector.scala:369)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> KeeperErrorCode = NoNode for
> /consumers/test-consumer-group/ids/test-consumer-group_doozer
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
> at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
> at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>
>
>
> granted this could be all about my implementation it is based largely on
> the "SimpleConsumerShell" as I was not sure of any other starting point
> without a lot of my own heavy lifting and just doing it myself (which seems
> to have a lot involved which is fine then something like
> https://issues.apache.org/jira/browse/KAFKA-232 will be important too for
> me something I looked at today need to dig more)
>
> thoughts? thanks? what are other folks doing?  is anyone using the
> SimpleConsumer successfully?
>
> --
>
> /*
> Joe Stein
> http://www.linkedin.com/in/charmalloc
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> */
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message