kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Brekken <brek...@gmail.com>
Subject Kafka Streams incorrect aggregation results when re-balancing occurs
Date Mon, 19 Aug 2019 19:24:15 GMT
Hi all, I have a (relatively) simple streams topology that is producing
some counts, and while testing this code I'm seeing some occasional
incorrect aggregation results.  This seems to happen when a re-balance
occurs - typically due to a timeout or communication hiccup with the Kafka
broker.  The topology is built with the DSL, and utilizes 2 KTables: the
first is really just a de-dup table and the second is the result of the
aggregation.  So at a high level the topology consumes from a source topic,
  groupsByKey() and then does a reduce() where we always return the
newValue.  Then it does a groupBy() on a new key, and finally an
aggregate() call with an adder and subtractor.  Because our source topic
frequently contains duplicate messages, this seemed like a good way to
handle the dupes: the subtractor gets invoked anytime we replace a value in
the "upstream" KTable and removes it from the count, then adds it back
again in the adder.

In the happy-path scenario where we never see any exceptions or rebalances,
this whole thing works great - the counts at the end are 100% correct.  But
when rebalancing is triggered we sometimes get bad counts. My theory is
that when a timeout or connectivity problem happens during that second
aggregation, the data ends up getting saved to the state store but the
offsets don't get committed and the message(s) in the repartition topic
that feed the aggregation get replayed after the stream task gets
rebalanced, causing the counts to get incorrectly incremented or
decremented.  (depending on whether the message was triggering the adder or
the subtractor)  I can simulate this problem (or something similar to this
problem) when debugging the application in my IDE just by pausing execution
on a breakpoint inside the aggregation's adder or subtractor method for a
few seconds.  The result of the adder or subtractor gets saved to the state
store which means that when the messages in the repartition topic get
re-processed, the counts get doubled.  If I enable "exactly_once"
processing, I'm unable to recreate the problem and the counts are always

My questions are:

1.  Is this expected behavior? In a hostile application environment where
connectivity problems and rebalances happen frequently, is some degree of
incorrectly aggregated data just a reality of life?

2.  Is exactly_once processing the right solution if correctness is of
highest importance?  Or should I be looking at different ways of writing
the topology?

Thanks for any advice!


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message