kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Fouché <nfou...@onfocus.io>
Subject Re: Dismissing late messages in Kafka Streams
Date Thu, 20 Oct 2016 12:51:44 GMT
I forgot to mention that the default maintain duration of a window is 1
day. Would it be useful to warn the developer is the current maintain
duration is "not compatible" with the current  window size and interval ?

2016-10-20 14:49 GMT+02:00 Nicolas Fouché <nfouche@onfocus.io>:

> Hi Michael,
>
> thanks for the quick reply. Let's try to explain things a bit better:
>
> Within a call to `aggregateByKey`, I specified a window with the size of
> 15 days and an interval (hop) of 1 day, without setting a maintain duration.
> I produced a message, the timestamp being the current clock time, and
> looked at the messages produced in my final output topic: 15 messages were
> there, as expected, with a first aggregation.
> Then I produced a second message, and 15 messages were also produced, but:
> only the 2 messages from the 2 most recent windows had an updated
> aggregate. The 13 other ones contained the result of an initial
> aggregation. For example, if the aggregation was a simple counter, I would
> have 2 times a value of *2*, and 13 times a value of *1*.
> Producing a third message: I would have 2 times a value of *3*, and 13
> times a value of *1*.
> etc.
>
> Then I wondered about this "maintain duration thing". I changed it to 30
> days, and then all went well. The counter of every windows was incremented
> normally.
>
> So my conclusion was: an aggregate computed in a window *started* before
> the -now minus the maintain duration- is automatically dropped.
>
> To the problem:
> this aggregate is pushed to a topic, and this topic is consumed by Kafka
> Connect, to end up in Aerospike by replacing an existing record (
> https://github.com/aerospike/aerospike-kafka-connector).
> So if I make a mistake and send a message with a 40 days old timestamp, a
> new aggregate will be generated for this old window, and the new aggregate
> will overwrite a record in Aerospike.
>
> The question is then: I want to prevent my topology from accepting this
> 30+ days old messages, to avoid destroying data in the final database. Of
> course I can call `filter` to ignore old messages, but i would have
> somewhere in my code a window definition, and in another place a filter
> which would absolutely need to consider the window maintain duration. It
> would start to get messy with complex topologies. You get the idea, I
> wonder if a developer has to write this code, or if a config somewhere
> would help him.
>
> This whole behaviour, which I totally accept as "by-design", is as far as
> I know undocumented. And well, I find it quite harmful. Or did I miss
> something ?
>
> Finally, about "repairing":
> If I want to "repair" my aggregates by re-producing all my old messages,
> knowing about this window maintain time is essential. I have to change my
> code (or my config file) to ensure that my window maintain times are long
> enough.
>
> Thanks for sharing [1], I already read it. I guess I'll ask more specific
> questions when the time for a repair happens.
>
> [1]
> http://www.confluent.io/blog/data-reprocessing-with-kafka-st
> reams-resetting-a-streams-application
>
>
>
> 2016-10-20 10:37 GMT+02:00 Michael Noll <michael@confluent.io>:
>
>> Nicolas,
>>
>> > I set the maintain duration of the window to 30 days.
>> > If it consumes a message older than 30 days, then a new aggregate is
>> created
>> for this old window.
>>
>> I assume you mean:  If a message should have been included in the original
>> ("old") window but that message happens to arrive late (after the
>> "original" 30 days), then a new aggregate is created for this old window?
>> I wanted to ask this first because answering your questions depends on
>> what
>> exactly you mean here.
>>
>>
>> > The problem is that this old windowed aggregate is of course incomplete
>> and
>> > will overwrite a record in the final database.
>>
>> Not sure I understand -- why would the old windowed aggregate be
>> incomplete?  Could you explain a bit more what you mean?
>>
>>
>> > By the way, is there any article about replaying old messages. Some tips
>> > and tricks, like "you'd better do that in another deployment of your
>> > topology", and/or "you'd better use topics dedicated to repair".
>>
>> I am not aware of a deep dive article or docs on that just yet.  There's a
>> first blog post [1] about Kakfa's new Application Reset Tool that goes
>> into
>> this direction, but this is only a first step into the direction of
>> replaying/reprocessing of old messages.  Do you have specific questions
>> here that we can help you with in the meantime?
>>
>> [1]
>> http://www.confluent.io/blog/data-reprocessing-with-kafka-st
>> reams-resetting-a-streams-application
>>
>>
>>
>>
>>
>>
>> On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché <nfouche@onfocus.io>
>> wrote:
>>
>> > Hi,
>> >
>> > I aggregate some data with `aggregateByKey` and a `TimeWindows`.
>> >
>> > I set the maintain duration of the window to 30 days.
>> > If it consumes a message older than 30 days, then a new aggregate is
>> > created for this old window.
>> > The problem is that this old windowed aggregate is of course incomplete
>> and
>> > will overwrite a record in the final database.
>> >
>> > So is there a way to dismiss these old messages ?
>> >
>> > I only see the point of accepting old messages when the topology is
>> > launched in "repair" mode.
>> > By the way, is there any article about replaying old messages. Some tips
>> > and tricks, like "you'd better do that in another deployment of your
>> > topology", and/or "you'd better use topics dedicated to repair".
>> >
>> > Thanks
>> > Nicolas
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message