kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Date Mon, 07 Jan 2019 20:10:46 GMT
Hi Peter,

Sorry, I just now have seen this thread.

You asked if this behavior is unexpected, and the answer is yes.
Suppress.untilWindowCloses is intended to emit only the final result,
regardless of restarts.

You also asked how the suppression buffer can resume after a restart, since
it's not persistent.
The answer is the same as for in-memory stores. The state of the store (or
buffer, in this case)
is persisted to a changelog topic, which is re-read on restart to re-create
the exact state prior to shutdown.
"Persistent" in the store nomenclature refers only to "persistent on the
local disk".

Just to confirm your response regarding the buffer size:
While it is better to use the public ("Suppressed.unbounded()") API, yes,
your buffer was already unbounded.

I looked at your custom transfomer, and it looks almost correct to me. The
only flaw seems to be that it only looks
for closed windows for the key currently being processed, which means that
if you have key "A" buffered, but don't get another event for it for a
while after the window closes, you won't emit the final result. This might
actually take longer than the window retention period, in which case, the
data would be deleted without ever emitting the final result.

You said you think it should be possible to get the DSL version working,
and I agree, since this is exactly what it was designed for. Do you mind
filing a bug in the "KAFKA" Jira project (
https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
keep the investigation organized that way.

In the mean time, I'll take another look at your logs above and try to
reason about what could be wrong.

Just one clarification... For example, you showed
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164

Am I correct in thinking that the first, shorter list is the "incremental"
version, and the second is the "final" version? I think so, but am confused

Thanks for the report,

On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <peter.levart@gmail.com> wrote:

> On 12/21/18 3:16 PM, Peter Levart wrote:
> > I also see some results that are actual non-final window aggregations
> > that precede the final aggregations. These non-final results are never
> > emitted out of order (for example, no such non-final result would ever
> > come after the final result for a particular key/window).
> Absence of proof is not the proof of absence... And I have later
> observed (using the DSL variant, not the custom Transformer) an
> occurrence of a non-final result that was emited after restart of
> streams processor while the final result for the same key/window had
> been emitted before the restart:
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
> ...
> ... restart ...
> ...
> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
> The app logic can not even rely on guarantee that results are ordered
> then. This is really not usable until the bug is fixed.
> Regards, Peter

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message