kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ian Friedman <...@flurry.com>
Subject Re: Offset committing on rebalance
Date Tue, 20 Aug 2013 06:26:01 GMT
Sorry, ignore that first exception, I believe that was caused by an actual manual shutdown.
The NoNode exception though, has been popping up a lot, and I am not sure if it's relevant,
but it seems to show up a bunch when the consumers decide it's time to rebalance continuously.
 

--  
Ian Friedman


On Tuesday, August 20, 2013 at 2:17 AM, Ian Friedman wrote:

> That's not it either. I just had all my consumers shut down on me with this:  
>  
> INFO  21:51:13,948 () ZkUtils$ - conflict in /consumers/flurry1/owners/dataLogPaths/1-183
data: flurry1_hs1030-1376964634130-dcc9192a-0 stored data: flurry1_hs1061-1376964609207-4b7f348b-0
> INFO  21:51:13,948 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
waiting for the partition ownership to be deleted: 1-183
> INFO  21:51:13,950 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
flurry1_hs1030-1376964634130-dcc9192a-0 successfully owned partition 1-180 for topic dataLogPaths
>  
>  
>  
> and I've also been seeing:
>  
> INFO  21:51:15,971 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
begin rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3
> INFO  21:51:16,038 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
exception during rebalance  
> org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa
>         at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162)
>         at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:66)
>         at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:259)
>         at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:258)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>         at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521)
>         at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:258)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:478)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:449)
>         at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:285)
>         at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444)
>         at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401)
> Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode
for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa
>         at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
>         at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1151)
>         at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1180)
>         at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>         ... 19 more
> INFO  21:51:16,039 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a
end rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3
>  
>  
> any ideas?  
>  
> --  
> Ian Friedman
>  
>  
> On Monday, August 19, 2013 at 11:58 PM, Jun Rao wrote:
>  
> > Any failure/restart of a consumer or a broker can also trigger a rebalance.
> >  
> > Thanks,
> >  
> > Jun
> >  
> >  
> > On Mon, Aug 19, 2013 at 6:00 PM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com)>
wrote:
> >  
> > > Jun, I read that FAQ entry you linked, but I am not seeing any Zookeeper
> > > connection loss in the logs. It's rebalancing multiple times per minute,
> > > though. Any idea what else could cause this? We're running kafka 0.7.2 on
> > > approx 400 consumers against a topic with 400 partitions * 3 brokers.
> > >  
> > > --
> > > Ian Friedman
> > >  
> > >  
> > > On Thursday, August 15, 2013 at 11:52 AM, Jun Rao wrote:
> > >  
> > > > Yes, during rebalances, messages could be re-delievered since the new
> > > owner
> > > > of a partition starts fetching from the last checkpointed offset in ZK.
> > > >  
> > > > For reasons on why rebalances happen a lot, see
> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog%3F
> > > >  
> > > > Thanks,
> > > >  
> > > > Jun
> > > >  
> > > >  
> > > > On Thu, Aug 15, 2013 at 8:36 AM, Ian Friedman <ian@flurry.com (mailto:ian@flurry.com)
(mailto:
> > > ian@flurry.com (mailto:ian@flurry.com))> wrote:
> > > >  
> > > > > It's a simple enough patch, but wouldn't this mean that messages
still
> > > in
> > > > > process when a rebalance happens could get delivered to another
> > > >  
> > >  
> > > consumer if
> > > > > we end up losing the partition? Rebalances seem to happen very
> > > >  
> > >  
> > > frequently
> > > > > with a lot of consumers for some reason… And it doesn't seem like
a
> > > > > consumer is guaranteed or likely to retain ownership of a partition
> > > > >  
> > > >  
> > >  
> > > it's in
> > > > > the middle of consuming after a rebalance.
> > > > >  
> > > > > --
> > > > > Ian Friedman
> > > > >  
> > > > >  
> > > > > On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote:
> > > > >  
> > > > > > We are only patching blocker issues in 0.7. 0.8 beta1 has been
> > > released
> > > > > and
> > > > > > most dev effort will be on 0.8 and beyond. That said. This particular
> > > > >  
> > > > >  
> > > > > case
> > > > > > is easy to fix. If you can port the patch in
> > > > > > https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7 branch
,
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > we
> > > > > >  
> > > > >  
> > > > >  
> > > > > can
> > > > > > commit that to the 0.7 branch.
> > > > > >  
> > > > > > Thanks,
> > > > > >  
> > > > > > Jun
> > > > > >  
> > > > > >  
> > > > > > On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <ian@flurry.com
(mailto:ian@flurry.com)(mailto:
> > > ian@flurry.com (mailto:ian@flurry.com)) (mailto:
> > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com))>
wrote:
> > > > > >  
> > > > > > > Ugh.
> > > > > > >  
> > > > > > > Is there any way to make this work in 0.7, or is transitioning
to
> > > 0.8
> > > > > the
> > > > > > > only way? My operations engineers spent a lot of effort
in
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > configuring
> > > > > >  
> > > > >  
> > > > >  
> > > > > and
> > > > > > > hardening our 0.7 production install, and 0.8 isn't released
yet.
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > Not
> > > > > >  
> > > > >  
> > > > >  
> > > > > to
> > > > > > > mention having to integrate the new client side code.
> > > > > > >  
> > > > > > > Either way, thanks for all your help Jun.
> > > > > > >  
> > > > > > > --
> > > > > > > Ian Friedman
> > > > > > >  
> > > > > > >  
> > > > > > > On Thursday, August 15, 2013 at 12:21 AM, Jun Rao wrote:
> > > > > > >  
> > > > > > > > Yes, this is an issue and has been fixed in 0.8.
> > > > > > > >  
> > > > > > > > Thanks,
> > > > > > > >  
> > > > > > > > Jun
> > > > > > > >  
> > > > > > > >  
> > > > > > > > On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <ian@flurry.com
(mailto:ian@flurry.com)(mailto:
> > > ian@flurry.com (mailto:ian@flurry.com))(mailto:
> > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com)) (mailto:
> > > > > > > ian@flurry.com (mailto:ian@flurry.com) (mailto:ian@flurry.com))>
wrote:
> > > > > > > >  
> > > > > > > > > Hey guys,
> > > > > > > > >  
> > > > > > > > > I designed my consumer app (running on 0.7) to
run with
> > > autocommit
> > > > > off
> > > > > > > and
> > > > > > > > > commit manually once it was done processing a
record. The
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > intent
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > was so
> > > > > > > > > that if a consumer died while processing a message,
the offset
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > would
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > not be
> > > > > > > > > committed, and another box would pick up the
partition and
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > reprocess
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > the
> > > > > > > > > message. This seemed to work fine with small
numbers of
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > consumers
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > (~10).
> > > > > > > > > But now that I'm scaling it out, I'm running
into a problem
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > >  
> > > where
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > it
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > looks
> > > > > > > > > like messages that consumers picked up and then
errored on are
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > not
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > getting
> > > > > > > > > processed on another machine.
> > > > > > > > >  
> > > > > > > > > After investigating the logs and the partition
offsets in
> > > > > zookeeper, I
> > > > > > > > > found that in ZookeeperConsumerConnector.scala
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > closeFetchersForQueues,
> > > > > > > > > called during the rebalance process, will commit
the offset
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > regardless
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > of
> > > > > > > > > the autocommit status. So it looks like even
if my consumer is
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > in
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > the
> > > > > > > > > middle of processing a message, the offset will
be committed,
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > and
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > even
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > if
> > > > > > > > > the processing fails, it will never be picked
up again. Now
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > that I
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > have a
> > > > > > > > > lot of consumer nodes, the rebalancer is going
off a lot more
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > >  
> > > often
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > and I'm
> > > > > > > > > running into this constantly.
> > > > > > > > >  
> > > > > > > > > Were my assumptions faulty? Did I design this
wrong? After
> > > reading
> > > > > the
> > > > > > > > > comment in the code I understand that if it didn't
commit the
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > offset
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > there,
> > > > > > > > > the message would just get immediately consumed
by whoever
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > > ended up
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > owning
> > > > > > > > > the partition, even if we were in the middle
of consuming it
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > elsewhere, and
> > > > > > > > > we'd get unintentional duplicate delivery. How
can I make it
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > >  
> > >  
> > > work
> > > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > > > the
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > > > way
> > > > > > > > > I've described? Is there any way?
> > > > > > > > >  
> > > > > > > > > Thanks in advance,
> > > > > > > > >  
> > > > > > > > > --
> > > > > > > > > Ian Friedman
> > > > > > > > >  
> > > > > > > >  
> > > > > > >  
> > > > > > >  
> > > > > >  
> > > > >  
> > > > >  
> > > >  
> > >  
> > >  
> >  
> >  
> >  
> >  
>  
>  


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message