kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stevo Slavić <ssla...@gmail.com>
Subject Re: At-least-once guarantees with high-level consumer
Date Wed, 17 Jun 2015 08:12:50 GMT
With auto-commit one can only have at-most-once delivery guarantee - after
commit but before message is delivered for processing, or even after it is
delivered but before it is processed, things can fail, causing event not to
be processed, which is basically same outcome as if it was not delivered.

On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heymann@gmail.com> wrote:

> Hi
>
> ** Disclaimer: I know there's a new consumer API on the way, this mail is
> about the currently available API. I also apologise if the below has
> already been discussed previously. I did try to check previous discussions
> on ConsumerIterator **
>
> It seems to me that the high-level consumer would be able to support
> at-least-once messaging, even if one uses auto-commit, by changing
> kafka.consumer.ConsumerIterator.next() to call
> currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
> consumer thread for a KafkaStream could just loop:
>
> while (true) {
>     MyMessage message = iterator.next().message();
>     process(message);
> }
>
> Each call to "iterator.next()" then updates the offset to commit to the end
> of the message that was just processed. When offsets are committed for the
> ConsumerConnector (either automatically or manually), the commit will not
> include offsets of messages that haven't been fully processed.
>
> I've tested the following ConsumerIterator.next(), and it seems to work as
> I expect:
>
>   override def next(): MessageAndMetadata[K, V] = {
>     // New code: reset consumer offset to the end of the previously
> consumed message:
>     if (consumedOffset > -1L && currentTopicInfo != null) {
>         currentTopicInfo.resetConsumeOffset(consumedOffset)
>         val topic = currentTopicInfo.topic
>         trace("Setting %s consumed offset to %d".format(topic,
> consumedOffset))
>     }
>
>     // Old code, excluding reset:
>     val item = super.next()
>     if(consumedOffset < 0)
>       throw new KafkaException("Offset returned by the message set is
> invalid %d".format(consumedOffset))
>     val topic = currentTopicInfo.topic
>     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>     item
>   }
>
> I've seen several people asking about managing commit offsets manually with
> the high level consumer. I suspect that this approach (the modified
> ConsumerIterator) would scale better than having a separate
> ConsumerConnecter per stream just so that you can commit offsets with
> at-least-once semantics. The downside of this approach is more duplicate
> deliveries after recovery from hard failure (but this is "at least once",
> right, not "exactly once").
>
> I don't propose that the code necessarily be changed like this in trunk, I
> just want to know if the approach seems reasonable.
>
> Regards
> Carl Heymann
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message