kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: Kafka Streams incorrect aggregation results when re-balancing occurs
Date Wed, 21 Aug 2019 07:08:41 GMT
Hi Alex,

if you are interested in understanding exactly-once a bit more in
detail, I recommend you to watch the following Kafka Summit talk by
Matthias

https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka

Best,
Bruno

On Wed, Aug 21, 2019 at 5:07 AM Alex Brekken <brekkal@gmail.com> wrote:
>
> Thanks guys.  I knew that re-processing messages was a possibility with
> at_least_once processing, but I guess I hadn't considered the potential
> impact on the state stores as far as aggregations are concerned.  So with
> exactly_once, it must roll-back commit(s) to the state store in a failure
> scenario?  I haven't dug into the code to see how it works, but given the
> behavior I'm seeing it must..
>
> Tim - I actually saw your related question from last week right after I
> sent mine.  :)
>
> Alex
>
> On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <bruno@confluent.io> wrote:
>
> > Hi Alex,
> >
> > what you describe about failing before offsets are committed is one
> > reason why records are processed multiple times under the
> > at-least-once processing guarantee. That is reality of life as you
> > stated. Kafka Streams in exactly-once mode guarantees that this
> > duplicate state updates do not happen.
> >
> > The exactly-once processing guarantee was implemented in Kafka Streams
> > for use cases where correctness is of highest importance.
> >
> > Best,
> > Bruno
> >
> >
> >
> > On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <brekkal@gmail.com> wrote:
> > >
> > > Hi all, I have a (relatively) simple streams topology that is producing
> > > some counts, and while testing this code I'm seeing some occasional
> > > incorrect aggregation results.  This seems to happen when a re-balance
> > > occurs - typically due to a timeout or communication hiccup with the
> > Kafka
> > > broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> > > first is really just a de-dup table and the second is the result of the
> > > aggregation.  So at a high level the topology consumes from a source
> > topic,
> > >   groupsByKey() and then does a reduce() where we always return the
> > > newValue.  Then it does a groupBy() on a new key, and finally an
> > > aggregate() call with an adder and subtractor.  Because our source topic
> > > frequently contains duplicate messages, this seemed like a good way to
> > > handle the dupes: the subtractor gets invoked anytime we replace a value
> > in
> > > the "upstream" KTable and removes it from the count, then adds it back
> > > again in the adder.
> > >
> > > In the happy-path scenario where we never see any exceptions or
> > rebalances,
> > > this whole thing works great - the counts at the end are 100% correct.
> > But
> > > when rebalancing is triggered we sometimes get bad counts. My theory is
> > > that when a timeout or connectivity problem happens during that second
> > > aggregation, the data ends up getting saved to the state store but the
> > > offsets don't get committed and the message(s) in the repartition topic
> > > that feed the aggregation get replayed after the stream task gets
> > > rebalanced, causing the counts to get incorrectly incremented or
> > > decremented.  (depending on whether the message was triggering the adder
> > or
> > > the subtractor)  I can simulate this problem (or something similar to
> > this
> > > problem) when debugging the application in my IDE just by pausing
> > execution
> > > on a breakpoint inside the aggregation's adder or subtractor method for a
> > > few seconds.  The result of the adder or subtractor gets saved to the
> > state
> > > store which means that when the messages in the repartition topic get
> > > re-processed, the counts get doubled.  If I enable "exactly_once"
> > > processing, I'm unable to recreate the problem and the counts are always
> > > accurate.
> > >
> > > My questions are:
> > >
> > > 1.  Is this expected behavior? In a hostile application environment where
> > > connectivity problems and rebalances happen frequently, is some degree of
> > > incorrectly aggregated data just a reality of life?
> > >
> > > 2.  Is exactly_once processing the right solution if correctness is of
> > > highest importance?  Or should I be looking at different ways of writing
> > > the topology?
> > >
> > > Thanks for any advice!
> > >
> > > Alex
> >

Mime
View raw message