kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Offset committing on rebalance
Date Thu, 15 Aug 2013 04:21:58 GMT
Yes, this is an issue and has been fixed in 0.8.



On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <ian@flurry.com> wrote:

> Hey guys,
> I designed my consumer app (running on 0.7) to run with autocommit off and
> commit manually once it was done processing a record. The intent was so
> that if a consumer died while processing a message, the offset would not be
> committed, and another box would pick up the partition and reprocess the
> message. This seemed to work fine with small numbers of consumers (~10).
> But now that I'm scaling it out, I'm running into a problem where it looks
> like messages that consumers picked up and then errored on are not getting
> processed on another machine.
> After investigating the logs and the partition offsets in zookeeper, I
> found that in ZookeeperConsumerConnector.scala closeFetchersForQueues,
> called during the rebalance process, will commit the offset regardless of
> the autocommit status. So it looks like even if my consumer is in the
> middle of processing a message, the offset will be committed, and even if
> the processing fails, it will never be picked up again. Now that I have a
> lot of consumer nodes, the rebalancer is going off a lot more often and I'm
> running into this constantly.
> Were my assumptions faulty? Did I design this wrong? After reading the
> comment in the code I understand that if it didn't commit the offset there,
> the message would just get immediately consumed by whoever ended up owning
> the partition, even if we were in the middle of consuming it elsewhere, and
> we'd get unintentional duplicate delivery. How can I make it work the way
> I've described? Is there any way?
> Thanks in advance,
> --
> Ian Friedman

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message