kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Kafka Streams incorrect aggregation results when re-balancing occurs
Date Wed, 21 Aug 2019 06:00:55 GMT
> So with
>> exactly_once, it must roll-back commit(s) to the state store in a failure
>> scenario?

Yes. Dirty writes into the stores are "cleaned up" if you enable
exactly-once processing semantics.

"commit" and never rolled back, as a commit indicates successful
processing :)


On 8/20/19 8:07 PM, Alex Brekken wrote:
> Thanks guys.  I knew that re-processing messages was a possibility with
> at_least_once processing, but I guess I hadn't considered the potential
> impact on the state stores as far as aggregations are concerned.  So with
> exactly_once, it must roll-back commit(s) to the state store in a failure
> scenario?  I haven't dug into the code to see how it works, but given the
> behavior I'm seeing it must..
> Tim - I actually saw your related question from last week right after I
> sent mine.  :)
> Alex
> On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna <bruno@confluent.io> wrote:
>> Hi Alex,
>> what you describe about failing before offsets are committed is one
>> reason why records are processed multiple times under the
>> at-least-once processing guarantee. That is reality of life as you
>> stated. Kafka Streams in exactly-once mode guarantees that this
>> duplicate state updates do not happen.
>> The exactly-once processing guarantee was implemented in Kafka Streams
>> for use cases where correctness is of highest importance.
>> Best,
>> Bruno
>> On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken <brekkal@gmail.com> wrote:
>>> Hi all, I have a (relatively) simple streams topology that is producing
>>> some counts, and while testing this code I'm seeing some occasional
>>> incorrect aggregation results.  This seems to happen when a re-balance
>>> occurs - typically due to a timeout or communication hiccup with the
>> Kafka
>>> broker.  The topology is built with the DSL, and utilizes 2 KTables: the
>>> first is really just a de-dup table and the second is the result of the
>>> aggregation.  So at a high level the topology consumes from a source
>> topic,
>>>   groupsByKey() and then does a reduce() where we always return the
>>> newValue.  Then it does a groupBy() on a new key, and finally an
>>> aggregate() call with an adder and subtractor.  Because our source topic
>>> frequently contains duplicate messages, this seemed like a good way to
>>> handle the dupes: the subtractor gets invoked anytime we replace a value
>> in
>>> the "upstream" KTable and removes it from the count, then adds it back
>>> again in the adder.
>>> In the happy-path scenario where we never see any exceptions or
>> rebalances,
>>> this whole thing works great - the counts at the end are 100% correct.
>> But
>>> when rebalancing is triggered we sometimes get bad counts. My theory is
>>> that when a timeout or connectivity problem happens during that second
>>> aggregation, the data ends up getting saved to the state store but the
>>> offsets don't get committed and the message(s) in the repartition topic
>>> that feed the aggregation get replayed after the stream task gets
>>> rebalanced, causing the counts to get incorrectly incremented or
>>> decremented.  (depending on whether the message was triggering the adder
>> or
>>> the subtractor)  I can simulate this problem (or something similar to
>> this
>>> problem) when debugging the application in my IDE just by pausing
>> execution
>>> on a breakpoint inside the aggregation's adder or subtractor method for a
>>> few seconds.  The result of the adder or subtractor gets saved to the
>> state
>>> store which means that when the messages in the repartition topic get
>>> re-processed, the counts get doubled.  If I enable "exactly_once"
>>> processing, I'm unable to recreate the problem and the counts are always
>>> accurate.
>>> My questions are:
>>> 1.  Is this expected behavior? In a hostile application environment where
>>> connectivity problems and rebalances happen frequently, is some degree of
>>> incorrectly aggregated data just a reality of life?
>>> 2.  Is exactly_once processing the right solution if correctness is of
>>> highest importance?  Or should I be looking at different ways of writing
>>> the topology?
>>> Thanks for any advice!
>>> Alex

View raw message