kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: Kafka Streams incorrect aggregation results when re-balancing occurs
Date Tue, 20 Aug 2019 19:28:08 GMT
Hi Alex,

what you describe about failing before offsets are committed is one
reason why records are processed multiple times under the
at-least-once processing guarantee. That is reality of life as you
stated. Kafka Streams in exactly-once mode guarantees that this
duplicate state updates do not happen.

The exactly-once processing guarantee was implemented in Kafka Streams
for use cases where correctness is of highest importance.

Best,
Bruno



On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <brekkal@gmail.com> wrote:
>
> Hi all, I have a (relatively) simple streams topology that is producing
> some counts, and while testing this code I'm seeing some occasional
> incorrect aggregation results.  This seems to happen when a re-balance
> occurs - typically due to a timeout or communication hiccup with the Kafka
> broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> first is really just a de-dup table and the second is the result of the
> aggregation.  So at a high level the topology consumes from a source topic,
>   groupsByKey() and then does a reduce() where we always return the
> newValue.  Then it does a groupBy() on a new key, and finally an
> aggregate() call with an adder and subtractor.  Because our source topic
> frequently contains duplicate messages, this seemed like a good way to
> handle the dupes: the subtractor gets invoked anytime we replace a value in
> the "upstream" KTable and removes it from the count, then adds it back
> again in the adder.
>
> In the happy-path scenario where we never see any exceptions or rebalances,
> this whole thing works great - the counts at the end are 100% correct.  But
> when rebalancing is triggered we sometimes get bad counts. My theory is
> that when a timeout or connectivity problem happens during that second
> aggregation, the data ends up getting saved to the state store but the
> offsets don't get committed and the message(s) in the repartition topic
> that feed the aggregation get replayed after the stream task gets
> rebalanced, causing the counts to get incorrectly incremented or
> decremented.  (depending on whether the message was triggering the adder or
> the subtractor)  I can simulate this problem (or something similar to this
> problem) when debugging the application in my IDE just by pausing execution
> on a breakpoint inside the aggregation's adder or subtractor method for a
> few seconds.  The result of the adder or subtractor gets saved to the state
> store which means that when the messages in the repartition topic get
> re-processed, the counts get doubled.  If I enable "exactly_once"
> processing, I'm unable to recreate the problem and the counts are always
> accurate.
>
> My questions are:
>
> 1.  Is this expected behavior? In a hostile application environment where
> connectivity problems and rebalances happen frequently, is some degree of
> incorrectly aggregated data just a reality of life?
>
> 2.  Is exactly_once processing the right solution if correctness is of
> highest importance?  Or should I be looking at different ways of writing
> the topology?
>
> Thanks for any advice!
>
> Alex

Mime
View raw message