kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Cadonna <br...@confluent.io>
Subject Re: KSTREAM-AGGREGATE-STATE-STORE persistence?
Date Mon, 12 Aug 2019 13:17:44 GMT
Hi Tim,

Kafka Streams guarantees at-least-once processing semantics by
default. That means, a record is processed (e.g. added to an
aggregate) at least once but might be processed multiple times. The
cause for processing the same record multiple time are crashes as you
described. Exactly-once processing guarantees need to be explicitly
turned on in Kafka Streams.

See the following links for more information:
https://kafka.apache.org/23/documentation/streams/core-concepts#streams_processing_guarantee
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

Best,
Bruno

On Mon, Aug 12, 2019 at 2:38 PM Tim Ward
<tim.ward@origamienergy.com.invalid> wrote:
>
> I believe I have witnessed - at least twice - something like the following happening,
in a Kafka Streams application where I have a .groupByKey().windowedBy().aggregate() sequence.
>
>
>   *   Application runs for a while
>   *   Application crashes
>   *   Application restarts
>   *   Aggregator.apply() is called to aggregate an input message *that has already been
included in the aggregate*
>
> It looks like when the application crashes, the KSTREAM-AGGREGATE-STATE-STORE has been
persisted after message X has been aggregated but message X has not been committed back to
the original source topic.
>
> So on restart message X gets read and processed again, and gets aggregated a second time
into the same aggregate.
>
> Now that I know this is happening (it was spotted by what I thought was some over-the-top
paranoid validation code) I can cope with it, and it is possible to make the aggregation operation
idempotent, because of the structure of the particular operation I'm doing ... but what if
the aggregation had been something like a simple counting or totalling operation? How would
anyone know the original input message(s) had been aggregated more than once?
>
> So, my question:
>
> Am I correct in diagnosing that persisting the state store and committing the original
source message are not carried out atomically, and one has to expect the same message can
be applied to the same aggregate multiple times, and if one cares about this one has to detect
it happening and make the aggregation process idempotent? I don't see this explained in the
JavaDoc for either Aggregator (or Reducer, where presumably it also applies).
>
> Tim Ward
>
> This email is from Origami Energy Limited. The contents of this email and any attachment
are confidential to the intended recipient(s). If you are not an intended recipient: (i) do
not use, disclose, distribute, copy or publish this email or its contents; (ii) please contact
Origami Energy Limited immediately; and then (iii) delete this email. For more information,
our privacy policy is available here: https://origamienergy.com/privacy-policy/. Origami Energy
Limited (company number 8619644) is a company registered in England with its registered office
at Ashcombe Court, Woolsack Way, Godalming, GU7 1LQ.

Mime
View raw message