kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: How to recover from ConsumerRebalanceFailedException ?
Date Fri, 21 Nov 2014 01:31:08 GMT
HI Jun,

Do you want me to request Jira ticket for feature a notification for new
consumer API and old consumer feature that consumer stream is dying.  So
application can try to restart it programmatically.  I understand this is
due to network or zk cluster instability.

Let me know if you have alternative proposal for this for new  and old
high-level consumer API.

Thanks,

Bhavesh

On Tue, Nov 18, 2014 at 9:53 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com>
wrote:

> Hi Jun,
>
> ZK cluster are up and running.  What is best way to programmatically
> recover and I would try to exponential recovery process which I am willing
> to implement.    So do you think monitoring  "ZkClient-EventThread
> <http://zkclient-eventthread-65-dare-msgq00.sv.walmartlabs.com:9091/>*"
>  thread status will be enough to indicate source thread is dead and
> therefore start exponential reconnect process ?
>
> Can you guys at least for new consumer api (0.9.0) provide a call back
> method or notification the consumer is died and reason for it ?
>
>
> Thanks,
> Bhavesh
>
>
>
> On Tue, Nov 18, 2014 at 9:34 PM, Jun Rao <junrao@gmail.com> wrote:
>
>> Is your ZK service alive at that point? If not, you just need to monitor
>> the ZK server properly.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Nov 17, 2014 at 2:30 PM, Bhavesh Mistry <
>> mistry.p.bhavesh@gmail.com>
>> wrote:
>>
>> > Hi Kafka Team,
>> >
>> >
>> > I get following exception due to ZK/Network issues intermittently.  How
>> do
>> > I recover from consumer thread dying *programmatically* and restart
>> source
>> > because we have alerts that due to this error we have partition
>> OWNERSHIP
>> > is *none* ?  Please let me know how to restart source and detect
>> consumer
>> > thread died and need to be restarted ?
>> >
>> >
>> >
>> > 17 Nov 2014 04:29:41,180 ERROR [
>> > ZkClient-EventThread-65-dare-msgq00.sv.walmartlabs.com:9091,
>> > dare-msgq01.sv.walmartlabs.com:9091,dare-msgq02.sv.walmartlabs.com:9091
>> ]
>> > (org.I0Itec.zkclient.ZkEventThread.run:77)  - Error handling event
>> > ZkEvent[New session event sent to
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener@552c5ea8
>> > ]
>> > kafka.common.ConsumerRebalanceFailedException:
>> >
>> >
>> mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>> > can't rebalance after 8 retries
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> >
>> >
>> >
>> >
>> >
>> > ZK Connection Issues:
>> >
>> > java.net.SocketException: Transport endpoint is not connected
>> >         at sun.nio.ch.SocketChannelImpl.shutdown(Native Method)
>> >         at
>> > sun.nio.ch.SocketChannelImpl.shutdownInput(SocketChannelImpl.java:633)
>> >         at
>> sun.nio.ch.SocketAdaptor.shutdownInput(SocketAdaptor.java:360)
>> >         at
>> > org.apache.zookeeper.ClientCnxn$SendThread.cleanup(ClientCnxn.java:1205)
>> >         at
>> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1170)
>> >
>> >
>> >
>> >
>> >         at
>> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>> >         at
>> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>> >         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>> >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:449)
>> >         at
>> > kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:630)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:601)
>> >         at
>> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:595)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:592)
>> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:591)
>> >         at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:481)
>> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>> > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
>> > KeeperErrorCode = NoNode for
>> >
>> >
>> /consumers/mupd_statsd_logmon_metric_events18/ids/mupd_statsd_logmon_metric_events18_sdc-stg-logsearch-app1-1416023311208-14501d68
>> >         at
>> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
>> >         at
>> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
>> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:921)
>> >         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:950)
>> >         at
>> org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>> >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>> >         at
>> > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message