kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: At-least-once guarantees with high-level consumer
Date Thu, 18 Jun 2015 21:19:59 GMT
HI Carl,

Produce side retry can produce duplicated message being sent to brokers
with different offset with same message. Also, you may get duplicated when
the High Level Consumer offset is not being saved or commit but you have
processed data and your server restart etc...



To guaranteed at-least one processing across partitions (and across
servers), you will need to store message hash or primary key into
distributed LRU cache (with eviction policy )  like Hazelcast
<http://www.hazelcast.com> and do dedupping across partitions.



I hope this help !



Thanks,

Bhavesh


On Wed, Jun 17, 2015 at 1:49 AM, yewton <yewton@gmail.com> wrote:

> So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?
>
> 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
>
>  --047d7bfcf30ed09b460518b241db
>>
>> Content-Type: text/plain; charset=UTF-8
>>
>>
>>
>>
>> With auto-commit one can only have at-most-once delivery guarantee - after
>>
>> commit but before message is delivered for processing, or even after it is
>>
>> delivered but before it is processed, things can fail, causing event not
>> to
>>
>> be processed, which is basically same outcome as if it was not delivered.
>>
>>
>>
>> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann <ch.heymann@gmail.com>
>> wrote:
>>
>>
>>
>> > Hi
>>
>> >
>>
>> > ** Disclaimer: I know there's a new consumer API on the way, this mail
>> is
>>
>> > about the currently available API. I also apologise if the below has
>>
>> > already been discussed previously. I did try to check previous
>> discussions
>>
>> > on ConsumerIterator **
>>
>> >
>>
>> > It seems to me that the high-level consumer would be able to support
>>
>> > at-least-once messaging, even if one uses auto-commit, by changing
>>
>> > kafka.consumer.ConsumerIterator.next() to call
>>
>> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> way, a
>>
>> > consumer thread for a KafkaStream could just loop:
>>
>> >
>>
>> > while (true) {
>>
>> >     MyMessage message = iterator.next().message();
>>
>> >     process(message);
>>
>> > }
>>
>> >
>>
>> > Each call to "iterator.next()" then updates the offset to commit to the
>> end
>>
>> > of the message that was just processed. When offsets are committed for
>> the
>>
>> > ConsumerConnector (either automatically or manually), the commit will
>> not
>>
>> > include offsets of messages that haven't been fully processed.
>>
>> >
>>
>> > I've tested the following ConsumerIterator.next(), and it seems to work
>> as
>>
>> > I expect:
>>
>> >
>>
>> >   override def next(): MessageAndMetadata[K, V] = {
>>
>> >     // New code: reset consumer offset to the end of the previously
>>
>> > consumed message:
>>
>> >     if (consumedOffset > -1L && currentTopicInfo != null) {
>>
>> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
>>
>> >         val topic = currentTopicInfo.topic
>>
>> >         trace("Setting %s consumed offset to %d".format(topic,
>>
>> > consumedOffset))
>>
>> >     }
>>
>> >
>>
>> >     // Old code, excluding reset:
>>
>> >     val item = super.next()
>>
>> >     if(consumedOffset < 0)
>>
>> >       throw new KafkaException("Offset returned by the message set is
>>
>> > invalid %d".format(consumedOffset))
>>
>> >     val topic = currentTopicInfo.topic
>>
>> >     consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
>>
>> >     consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
>>
>> >     item
>>
>> >   }
>>
>> >
>>
>> > I've seen several people asking about managing commit offsets manually
>> with
>>
>> > the high level consumer. I suspect that this approach (the modified
>>
>> > ConsumerIterator) would scale better than having a separate
>>
>> > ConsumerConnecter per stream just so that you can commit offsets with
>>
>> > at-least-once semantics. The downside of this approach is more duplicate
>>
>> > deliveries after recovery from hard failure (but this is "at least
>> once",
>>
>> > right, not "exactly once").
>>
>> >
>>
>> > I don't propose that the code necessarily be changed like this in
>> trunk, I
>>
>> > just want to know if the approach seems reasonable.
>>
>> >
>>
>> > Regards
>>
>> > Carl Heymann
>>
>> >
>>
>>
>>
>> --047d7bfcf30ed09b460518b241db--
>>
>>
>>
>>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message