kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tim Ward <tim.w...@origamienergy.com.INVALID>
Subject RE: Kafka Streams incorrect aggregation results when re-balancing occurs
Date Tue, 20 Aug 2019 08:13:05 GMT
I asked an essentially similar question a week or two ago. The answer was "this is expected
behaviour unless you switch on exactly-once processing".

(In my case it was solved by changing the topology, which I had to do for other, unconnected,
reasons (the requirements for the application changed when I was part way through writing
it).)

Tim Ward

-----Original Message-----
From: Alex Brekken <brekkal@gmail.com>
Sent: 19 August 2019 20:24
To: users@kafka.apache.org
Subject: Kafka Streams incorrect aggregation results when re-balancing occurs

Hi all, I have a (relatively) simple streams topology that is producing
some counts, and while testing this code I'm seeing some occasional
incorrect aggregation results.  This seems to happen when a re-balance
occurs - typically due to a timeout or communication hiccup with the Kafka
broker.  The topology is built with the DSL, and utilizes 2 KTables: the
first is really just a de-dup table and the second is the result of the
aggregation.  So at a high level the topology consumes from a source topic,
  groupsByKey() and then does a reduce() where we always return the
newValue.  Then it does a groupBy() on a new key, and finally an
aggregate() call with an adder and subtractor.  Because our source topic
frequently contains duplicate messages, this seemed like a good way to
handle the dupes: the subtractor gets invoked anytime we replace a value in
the "upstream" KTable and removes it from the count, then adds it back
again in the adder.

In the happy-path scenario where we never see any exceptions or rebalances,
this whole thing works great - the counts at the end are 100% correct.  But
when rebalancing is triggered we sometimes get bad counts. My theory is
that when a timeout or connectivity problem happens during that second
aggregation, the data ends up getting saved to the state store but the
offsets don't get committed and the message(s) in the repartition topic
that feed the aggregation get replayed after the stream task gets
rebalanced, causing the counts to get incorrectly incremented or
decremented.  (depending on whether the message was triggering the adder or
the subtractor)  I can simulate this problem (or something similar to this
problem) when debugging the application in my IDE just by pausing execution
on a breakpoint inside the aggregation's adder or subtractor method for a
few seconds.  The result of the adder or subtractor gets saved to the state
store which means that when the messages in the repartition topic get
re-processed, the counts get doubled.  If I enable "exactly_once"
processing, I'm unable to recreate the problem and the counts are always
accurate.

My questions are:

1.  Is this expected behavior? In a hostile application environment where
connectivity problems and rebalances happen frequently, is some degree of
incorrectly aggregated data just a reality of life?

2.  Is exactly_once processing the right solution if correctness is of
highest importance?  Or should I be looking at different ways of writing
the topology?

Thanks for any advice!

Alex
This email is from Origami Energy Limited. The contents of this email and any attachment are
confidential to the intended recipient(s). If you are not an intended recipient: (i) do not
use, disclose, distribute, copy or publish this email or its contents; (ii) please contact
Origami Energy Limited immediately; and then (iii) delete this email. For more information,
our privacy policy is available here: https://origamienergy.com/privacy-policy/. Origami Energy
Limited (company number 8619644) is a company registered in England with its registered office
at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.
Mime
View raw message