kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Date Thu, 10 Jan 2019 16:33:22 GMT
Hi Peter,

Regarding retention, I was not referring to log retention, but to the
window store retention.
Since a new window is created every second (for example), there are in
principle an unbounded
number of windows (the longer the application runs, the more windows there
are, with no end).
However, we obviously can't store an infinite amount of data, so the window
definition includes
a retention period. By default, this is 24 hours. After the retention
period elapses, all of the data
for the window is purged to make room for new windows.

So what I meant was that if you buffer some key "A" in window (Monday
09:00:00) and then get
no further activity for A for over 24 hours, then when you do get that next
event for A, say at
(Tuesday 11:00:00), you'd do the scan but find nothing, since your buffered
state would already
have been purged from the store.

The way I avoided this problem for Suppression was to organize the data by
timestamp instead
of by key, so on *every* update I can search for all the keys that are old
enough and emit them.
I also don't use a window store, so I don't have to worry about the
retention time.

To answer your question about the window store's topic, it configures a
retention time the same
length as the store's retention time, (and they keys are the full windowed
key including the window
start time), so it'll have roughly the same size bound as the store itself.

Back to the process of figuring out what might be wrong with Suppression, I
don't suppose you
would be able to file a Jira and upload a repro program? If not, that's ok.
I haven't been able to
reproduce the bug yet, but it seems like it's happening somewhat
consistently for you, so I should
be able to get it to happen eventually.

Thanks, and sorry again for the troubles.

On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <peter.levart@gmail.com> wrote:

> On 1/8/19 12:57 PM, Peter Levart wrote:
> > Hi John,
> >
> > On 1/8/19 12:45 PM, Peter Levart wrote:
> >>> I looked at your custom transfomer, and it looks almost correct to
> >>> me. The
> >>> only flaw seems to be that it only looks
> >>> for closed windows for the key currently being processed, which
> >>> means that
> >>> if you have key "A" buffered, but don't get another event for it for a
> >>> while after the window closes, you won't emit the final result. This
> >>> might
> >>> actually take longer than the window retention period, in which
> >>> case, the
> >>> data would be deleted without ever emitting the final result.
> >>
> >> So in DSL case, the suppression works by flushing *all* of the "ripe"
> >> windows in the whole buffer whenever a singe event comes in with
> >> recent enough timestamp regardless of the key of that event?
> >>
> >> Is the buffer shared among processing tasks or does each task
> >> maintain its own private buffer that only contains its share of data
> >> pertaining to assigned input partitions? In case the tasks are
> >> executed on several processing JVM(s) the buffer can't really be
> >> shared, right? In that case a single event can't flush all of the
> >> "ripe" windows, but just those that are contained in the task's part
> >> of buffer...
> >
> > Just a question about your comment above:
> >
> > /"This might actually take longer than the window retention period, in
> > which case, the data would be deleted without ever emitting the final
> > result"/
> >
> > Are you talking about the buffer log topic retention? Aren't log
> > topics configured to "compact" rather than "delete" messages? So the
> > last "version" of the buffer entry for a particular key should stay
> > forever? What are the keys in suppression buffer log topic? Are they a
> > pair of (timestamp, key) ? Probably not since in that case the
> > compacted log would grow indefinitely...
> >
> > Another question:
> >
> > What are the keys in WindowStore's log topic? If the input keys to the
> > processor that uses such WindowStore consist of a bounded set of
> > values (for example user ids), would compacted log of such WindowStore
> > also be bounded?
> In case the key of WindowStore log topic is (timestamp, key) then would
> explicitly deleting flushed entries from WindowStore (by putting null
> value into the store) keep the compacted log bounded? In other words,
> does WindowStore log topic support a special kind of "tombstone" message
> that effectively removes the key from the compacted log?
> In that case, my custom processor could keep entries in its WindowStore
> for as log as needed, depending on the activity of a particular input
> key...
> >
> > Regards, Peter
> >
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message