kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carl Heymann <ch.heym...@gmail.com>
Subject Re: At-least-once guarantees with high-level consumer
Date Mon, 22 Jun 2015 07:36:26 GMT
OK, thanks. I agree, the current code is better if you get lots of
rebalancing, and you can do your own thing for stronger guarantees.

For the new consumer, it looks like it should be possible to use multiple
threads, as long as partition order is preserved in the processing, right?
So, one can build a custom API similar to the current connector + streams.
But I guess that's a different discussion.

With the new consumer API, rebalancing is handled during poll(), which is
called from a client. What if some client stops polling, will this cause
rebalancing hiccups for all consumers in the cluster? Let me know if this
has already been discussed.

On Mon, Jun 22, 2015 at 8:50 AM, Jiangjie Qin <jqin@linkedin.com.invalid>
wrote:

> Yes, your approach works. I am not sure if we should take this as default
> solution, though. User can have a simple wrapper + customized rebalance
> listener. The tricky part is that the rebalance listener might need
> different implementations. So it looks the current API provides enough
> simplicity and enough flexibility.
>
> For the new consumer, if there is only one user thread, this might not be
> a issue. If the consumer is shared by multiple threads (which is not
> recommended), similar principle applies - commit offsets only after
> processing them.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 6/21/15, 10:50 PM, "Carl Heymann" <ch.heymann@gmail.com> wrote:
>
> >Thanks Jiangjie
> >
> >So you agree that with the modified ConsumerIterator.next() code, the high
> >level consumer becomes at-least-once, even with auto-commit enabled? That
> >is what I really want to know.
> >
> >I'll have a look at the rebalancing code. I think I understand: during
> >rebalancing, with auto-commit enabled, the offsets are committed
> >in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing
> >might still be happening at this point. The rebalance listener is called
> >only after this commit. So the current code (without my change) would lead
> >to fewer duplicate messages, because it assumes that these transactions
> >normally complete. This seems prudent, since rebalancing happens much more
> >frequently than java processes being killed unexpectedly. On the other
> >hand
> >it means giving up at-least-once guarantees for message processing, when a
> >java process actually does die unexpectedly.
> >
> >So I see it should be better to create a custom offset tracking&commit
> >component, with some ability to wait a reasonable amount of time for
> >consumer threads on streams to complete their current transaction, on
> >rebalance, before committing from a rebalance listener.
> >
> >Is it OK to block for a second or two
> >in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
> >processing threads to complete? Will this hold up the whole cluster's
> >rebalancing?
> >
> >The new KafkaConsumer code doesn't appear to do a commit in the same way
> >during rebalance, when autocommit is enabled. So if current users of the
> >high level consumer switch to the new consumer, they might get more
> >duplicates on rebalance, right?
> >
> >Regards
> >Carl
> >
> >
> >On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin <jqin@linkedin.com.invalid>
> >wrote:
> >
> >> Hi Carl,
> >>
> >> Generally, you approach works to guarantee at least once consumption -
> >> basically people have to commit offset only after they have processed
> >>the
> >> message.
> >> The only problem is that in old high level consumer, during consumer
> >> rebalance consumer will (and should) commit offsets. To guarantee
> >> at-least-once and avoid unecessary duplicates on rebalance, ideally we
> >> should wait until all the messages returned by iterator to be processed
> >> before commit offsets.
> >>
> >> In LinkedIn, we have wrapper around open source consumer iterator where
> >>we
> >> can implants those logics.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 6/19/15, 12:22 AM, "Carl Heymann" <ch.heymann@gmail.com> wrote:
> >>
> >> >Thanks Bhavesh.
> >> >
> >> >I understand that to get "exactly once" processing of a message
> >>requires
> >> >some de-duplication. What I'm saying, is that the current high level
> >> >consumer, with automatic offset commits enabled, gives neither "at most
> >> >once" nor "at least once" guarantees: A consumer group might get
> >>duplicate
> >> >messages, but might also never fully process some messages (which is a
> >> >bigger problem for me).
> >> >
> >> >With the code change I propose, I think it changes to "at least once",
> >> >i.e.
> >> >one can then do the deduplication you describe, without worrying about
> >> >"losing" messages. Messages should not get committed without being
> >>fully
> >> >processed. I want to know if this code change has any obvious problems.
> >> >
> >> >Regards
> >> >Carl
> >> >
> >> >
> >> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
> >> ><mistry.p.bhavesh@gmail.com
> >> >> wrote:
> >> >
> >> >> HI Carl,
> >> >>
> >> >> Produce side retry can produce duplicated message being sent to
> >>brokers
> >> >> with different offset with same message. Also, you may get duplicated
> >> >>when
> >> >> the High Level Consumer offset is not being saved or commit but you
> >>have
> >> >> processed data and your server restart etc...
> >> >>
> >> >>
> >> >>
> >> >> To guaranteed at-least one processing across partitions (and across
> >> >> servers), you will need to store message hash or primary key into
> >> >> distributed LRU cache (with eviction policy )  like Hazelcast
> >> >> <http://www.hazelcast.com> and do dedupping across partitions.
> >> >>
> >> >>
> >> >>
> >> >> I hope this help !
> >> >>
> >> >>
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Bhavesh
> >> >>
> >> >>
> >> >> On Wed, Jun 17, 2015 at 1:49 AM, yewton <yewton@gmail.com> wrote:
> >> >>
> >> >> > So Carl Heymann's ConsumerIterator.next hack approach is not
> >> >>reasonable?
> >> >> >
> >> >> > 2015-06-17 08:12:50 +0000 上のメッセージ Stevo Slavić:
> >> >> >
> >> >> >  --047d7bfcf30ed09b460518b241db
> >> >> >>
> >> >> >> Content-Type: text/plain; charset=UTF-8
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> With auto-commit one can only have at-most-once delivery
> >>guarantee -
> >> >> after
> >> >> >>
> >> >> >> commit but before message is delivered for processing, or
even
> >>after
> >> >>it
> >> >> is
> >> >> >>
> >> >> >> delivered but before it is processed, things can fail, causing
> >>event
> >> >>not
> >> >> >> to
> >> >> >>
> >> >> >> be processed, which is basically same outcome as if it was
not
> >> >> delivered.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann
> >><ch.heymann@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> > Hi
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > ** Disclaimer: I know there's a new consumer API on the
way,
> >>this
> >> >>mail
> >> >> >> is
> >> >> >>
> >> >> >> > about the currently available API. I also apologise if
the below
> >> >>has
> >> >> >>
> >> >> >> > already been discussed previously. I did try to check
previous
> >> >> >> discussions
> >> >> >>
> >> >> >> > on ConsumerIterator **
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > It seems to me that the high-level consumer would be
able to
> >> >>support
> >> >> >>
> >> >> >> > at-least-once messaging, even if one uses auto-commit,
by
> >>changing
> >> >> >>
> >> >> >> > kafka.consumer.ConsumerIterator.next() to call
> >> >> >>
> >> >> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next().
> >>This
> >> >> >> way, a
> >> >> >>
> >> >> >> > consumer thread for a KafkaStream could just loop:
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > while (true) {
> >> >> >>
> >> >> >> >     MyMessage message = iterator.next().message();
> >> >> >>
> >> >> >> >     process(message);
> >> >> >>
> >> >> >> > }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > Each call to "iterator.next()" then updates the offset
to
> >>commit to
> >> >> the
> >> >> >> end
> >> >> >>
> >> >> >> > of the message that was just processed. When offsets
are
> >>committed
> >> >>for
> >> >> >> the
> >> >> >>
> >> >> >> > ConsumerConnector (either automatically or manually),
the commit
> >> >>will
> >> >> >> not
> >> >> >>
> >> >> >> > include offsets of messages that haven't been fully processed.
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I've tested the following ConsumerIterator.next(), and
it seems
> >>to
> >> >> work
> >> >> >> as
> >> >> >>
> >> >> >> > I expect:
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> >   override def next(): MessageAndMetadata[K, V] = {
> >> >> >>
> >> >> >> >     // New code: reset consumer offset to the end of
the
> >>previously
> >> >> >>
> >> >> >> > consumed message:
> >> >> >>
> >> >> >> >     if (consumedOffset > -1L && currentTopicInfo
!= null) {
> >> >> >>
> >> >> >> >         currentTopicInfo.resetConsumeOffset(consumedOffset)
> >> >> >>
> >> >> >> >         val topic = currentTopicInfo.topic
> >> >> >>
> >> >> >> >         trace("Setting %s consumed offset to %d".format(topic,
> >> >> >>
> >> >> >> > consumedOffset))
> >> >> >>
> >> >> >> >     }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> >     // Old code, excluding reset:
> >> >> >>
> >> >> >> >     val item = super.next()
> >> >> >>
> >> >> >> >     if(consumedOffset < 0)
> >> >> >>
> >> >> >> >       throw new KafkaException("Offset returned by the
message
> >>set
> >> >>is
> >> >> >>
> >> >> >> > invalid %d".format(consumedOffset))
> >> >> >>
> >> >> >> >     val topic = currentTopicInfo.topic
> >> >> >>
> >> >> >> >
> >> >>consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
> >> >> >>
> >> >> >> >
> >> >>consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
> >> >> >>
> >> >> >> >     item
> >> >> >>
> >> >> >> >   }
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I've seen several people asking about managing commit
offsets
> >> >>manually
> >> >> >> with
> >> >> >>
> >> >> >> > the high level consumer. I suspect that this approach
(the
> >>modified
> >> >> >>
> >> >> >> > ConsumerIterator) would scale better than having a separate
> >> >> >>
> >> >> >> > ConsumerConnecter per stream just so that you can commit
offsets
> >> >>with
> >> >> >>
> >> >> >> > at-least-once semantics. The downside of this approach
is more
> >> >> duplicate
> >> >> >>
> >> >> >> > deliveries after recovery from hard failure (but this
is "at
> >>least
> >> >> >> once",
> >> >> >>
> >> >> >> > right, not "exactly once").
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > I don't propose that the code necessarily be changed
like this
> >>in
> >> >> >> trunk, I
> >> >> >>
> >> >> >> > just want to know if the approach seems reasonable.
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >> > Regards
> >> >> >>
> >> >> >> > Carl Heymann
> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --047d7bfcf30ed09b460518b241db--
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message