kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ian Friedman <...@flurry.com>
Subject Offset committing on rebalance
Date Thu, 15 Aug 2013 00:21:56 GMT
Hey guys, 

I designed my consumer app (running on 0.7) to run with autocommit off and commit manually
once it was done processing a record. The intent was so that if a consumer died while processing
a message, the offset would not be committed, and another box would pick up the partition
and reprocess the message. This seemed to work fine with small numbers of consumers (~10).
But now that I'm scaling it out, I'm running into a problem where it looks like messages that
consumers picked up and then errored on are not getting processed on another machine.

After investigating the logs and the partition offsets in zookeeper, I found that in ZookeeperConsumerConnector.scala
closeFetchersForQueues, called during the rebalance process, will commit the offset regardless
of the autocommit status. So it looks like even if my consumer is in the middle of processing
a message, the offset will be committed, and even if the processing fails, it will never be
picked up again. Now that I have a lot of consumer nodes, the rebalancer is going off a lot
more often and I'm running into this constantly.

Were my assumptions faulty? Did I design this wrong? After reading the comment in the code
I understand that if it didn't commit the offset there, the message would just get immediately
consumed by whoever ended up owning the partition, even if we were in the middle of consuming
it elsewhere, and we'd get unintentional duplicate delivery. How can I make it work the way
I've described? Is there any way?

Thanks in advance,

-- 
Ian Friedman


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message