kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Brekken <brek...@gmail.com>
Subject Re: Kafka Streams incorrect aggregation results when re-balancing occurs
Date Wed, 21 Aug 2019 03:07:22 GMT
Thanks guys.  I knew that re-processing messages was a possibility with
at_least_once processing, but I guess I hadn't considered the potential
impact on the state stores as far as aggregations are concerned.  So with
exactly_once, it must roll-back commit(s) to the state store in a failure
scenario?  I haven't dug into the code to see how it works, but given the
behavior I'm seeing it must..

Tim - I actually saw your related question from last week right after I
sent mine.  :)


On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <bruno@confluent.io> wrote:

> Hi Alex,
> what you describe about failing before offsets are committed is one
> reason why records are processed multiple times under the
> at-least-once processing guarantee. That is reality of life as you
> stated. Kafka Streams in exactly-once mode guarantees that this
> duplicate state updates do not happen.
> The exactly-once processing guarantee was implemented in Kafka Streams
> for use cases where correctness is of highest importance.
> Best,
> Bruno
> On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <brekkal@gmail.com> wrote:
> >
> > Hi all, I have a (relatively) simple streams topology that is producing
> > some counts, and while testing this code I'm seeing some occasional
> > incorrect aggregation results.  This seems to happen when a re-balance
> > occurs - typically due to a timeout or communication hiccup with the
> Kafka
> > broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> > first is really just a de-dup table and the second is the result of the
> > aggregation.  So at a high level the topology consumes from a source
> topic,
> >   groupsByKey() and then does a reduce() where we always return the
> > newValue.  Then it does a groupBy() on a new key, and finally an
> > aggregate() call with an adder and subtractor.  Because our source topic
> > frequently contains duplicate messages, this seemed like a good way to
> > handle the dupes: the subtractor gets invoked anytime we replace a value
> in
> > the "upstream" KTable and removes it from the count, then adds it back
> > again in the adder.
> >
> > In the happy-path scenario where we never see any exceptions or
> rebalances,
> > this whole thing works great - the counts at the end are 100% correct.
> But
> > when rebalancing is triggered we sometimes get bad counts. My theory is
> > that when a timeout or connectivity problem happens during that second
> > aggregation, the data ends up getting saved to the state store but the
> > offsets don't get committed and the message(s) in the repartition topic
> > that feed the aggregation get replayed after the stream task gets
> > rebalanced, causing the counts to get incorrectly incremented or
> > decremented.  (depending on whether the message was triggering the adder
> or
> > the subtractor)  I can simulate this problem (or something similar to
> this
> > problem) when debugging the application in my IDE just by pausing
> execution
> > on a breakpoint inside the aggregation's adder or subtractor method for a
> > few seconds.  The result of the adder or subtractor gets saved to the
> state
> > store which means that when the messages in the repartition topic get
> > re-processed, the counts get doubled.  If I enable "exactly_once"
> > processing, I'm unable to recreate the problem and the counts are always
> > accurate.
> >
> > My questions are:
> >
> > 1.  Is this expected behavior? In a hostile application environment where
> > connectivity problems and rebalances happen frequently, is some degree of
> > incorrectly aggregated data just a reality of life?
> >
> > 2.  Is exactly_once processing the right solution if correctness is of
> > highest importance?  Or should I be looking at different ways of writing
> > the topology?
> >
> > Thanks for any advice!
> >
> > Alex

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message