kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Fouché <nfou...@onfocus.io>
Subject Re: Dismissing late messages in Kafka Streams
Date Thu, 20 Oct 2016 12:49:09 GMT
Hi Michael,

thanks for the quick reply. Let's try to explain things a bit better:

Within a call to `aggregateByKey`, I specified a window with the size of 15
days and an interval (hop) of 1 day, without setting a maintain duration.
I produced a message, the timestamp being the current clock time, and
looked at the messages produced in my final output topic: 15 messages were
there, as expected, with a first aggregation.
Then I produced a second message, and 15 messages were also produced, but:
only the 2 messages from the 2 most recent windows had an updated
aggregate. The 13 other ones contained the result of an initial
aggregation. For example, if the aggregation was a simple counter, I would
have 2 times a value of *2*, and 13 times a value of *1*.
Producing a third message: I would have 2 times a value of *3*, and 13
times a value of *1*.
etc.

Then I wondered about this "maintain duration thing". I changed it to 30
days, and then all went well. The counter of every windows was incremented
normally.

So my conclusion was: an aggregate computed in a window *started* before
the -now minus the maintain duration- is automatically dropped.

To the problem:
this aggregate is pushed to a topic, and this topic is consumed by Kafka
Connect, to end up in Aerospike by replacing an existing record (
https://github.com/aerospike/aerospike-kafka-connector).
So if I make a mistake and send a message with a 40 days old timestamp, a
new aggregate will be generated for this old window, and the new aggregate
will overwrite a record in Aerospike.

The question is then: I want to prevent my topology from accepting this 30+
days old messages, to avoid destroying data in the final database. Of
course I can call `filter` to ignore old messages, but i would have
somewhere in my code a window definition, and in another place a filter
which would absolutely need to consider the window maintain duration. It
would start to get messy with complex topologies. You get the idea, I
wonder if a developer has to write this code, or if a config somewhere
would help him.

This whole behaviour, which I totally accept as "by-design", is as far as I
know undocumented. And well, I find it quite harmful. Or did I miss
something ?

Finally, about "repairing":
If I want to "repair" my aggregates by re-producing all my old messages,
knowing about this window maintain time is essential. I have to change my
code (or my config file) to ensure that my window maintain times are long
enough.

Thanks for sharing [1], I already read it. I guess I'll ask more specific
questions when the time for a repair happens.

[1]
http://www.confluent.io/blog/data-reprocessing-with-kafka-
streams-resetting-a-streams-application



2016-10-20 10:37 GMT+02:00 Michael Noll <michael@confluent.io>:

> Nicolas,
>
> > I set the maintain duration of the window to 30 days.
> > If it consumes a message older than 30 days, then a new aggregate is
> created
> for this old window.
>
> I assume you mean:  If a message should have been included in the original
> ("old") window but that message happens to arrive late (after the
> "original" 30 days), then a new aggregate is created for this old window?
> I wanted to ask this first because answering your questions depends on what
> exactly you mean here.
>
>
> > The problem is that this old windowed aggregate is of course incomplete
> and
> > will overwrite a record in the final database.
>
> Not sure I understand -- why would the old windowed aggregate be
> incomplete?  Could you explain a bit more what you mean?
>
>
> > By the way, is there any article about replaying old messages. Some tips
> > and tricks, like "you'd better do that in another deployment of your
> > topology", and/or "you'd better use topics dedicated to repair".
>
> I am not aware of a deep dive article or docs on that just yet.  There's a
> first blog post [1] about Kakfa's new Application Reset Tool that goes into
> this direction, but this is only a first step into the direction of
> replaying/reprocessing of old messages.  Do you have specific questions
> here that we can help you with in the meantime?
>
> [1]
> http://www.confluent.io/blog/data-reprocessing-with-kafka-
> streams-resetting-a-streams-application
>
>
>
>
>
>
> On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché <nfouche@onfocus.io>
> wrote:
>
> > Hi,
> >
> > I aggregate some data with `aggregateByKey` and a `TimeWindows`.
> >
> > I set the maintain duration of the window to 30 days.
> > If it consumes a message older than 30 days, then a new aggregate is
> > created for this old window.
> > The problem is that this old windowed aggregate is of course incomplete
> and
> > will overwrite a record in the final database.
> >
> > So is there a way to dismiss these old messages ?
> >
> > I only see the point of accepting old messages when the topology is
> > launched in "repair" mode.
> > By the way, is there any article about replaying old messages. Some tips
> > and tricks, like "you'd better do that in another deployment of your
> > topology", and/or "you'd better use topics dedicated to repair".
> >
> > Thanks
> > Nicolas
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message