kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Levart <peter.lev...@gmail.com>
Subject Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Date Tue, 08 Jan 2019 11:45:41 GMT
Hi John,

On 1/7/19 9:10 PM, John Roesler wrote:
> Hi Peter,
>
> Sorry, I just now have seen this thread.
>
> You asked if this behavior is unexpected, and the answer is yes.
> Suppress.untilWindowCloses is intended to emit only the final result,
> regardless of restarts.
>
> You also asked how the suppression buffer can resume after a restart, since
> it's not persistent.
> The answer is the same as for in-memory stores. The state of the store (or
> buffer, in this case)
> is persisted to a changelog topic, which is re-read on restart to re-create
> the exact state prior to shutdown.
> "Persistent" in the store nomenclature refers only to "persistent on the
> local disk".
>
> Just to confirm your response regarding the buffer size:
> While it is better to use the public ("Suppressed.unbounded()") API, yes,
> your buffer was already unbounded.
>
> I looked at your custom transfomer, and it looks almost correct to me. The
> only flaw seems to be that it only looks
> for closed windows for the key currently being processed, which means that
> if you have key "A" buffered, but don't get another event for it for a
> while after the window closes, you won't emit the final result. This might
> actually take longer than the window retention period, in which case, the
> data would be deleted without ever emitting the final result.

So in DSL case, the suppression works by flushing *all* of the "ripe" 
windows in the whole buffer whenever a singe event comes in with recent 
enough timestamp regardless of the key of that event?

Is the buffer shared among processing tasks or does each task maintain 
its own private buffer that only contains its share of data pertaining 
to assigned input partitions? In case the tasks are executed on several 
processing JVM(s) the buffer can't really be shared, right? In that case 
a single event can't flush all of the "ripe" windows, but just those 
that are contained in the task's part of buffer...

>
> You said you think it should be possible to get the DSL version working,
> and I agree, since this is exactly what it was designed for. Do you mind
> filing a bug in the "KAFKA" Jira project (
> https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to
> keep the investigation organized that way.

Will do that.

>
> In the mean time, I'll take another look at your logs above and try to
> reason about what could be wrong.
>
> Just one clarification... For example, you showed
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172], sum: 138902
>> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
> 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164
>
> Am I correct in thinking that the first, shorter list is the "incremental"
> version, and the second is the "final" version? I think so, but am confused
> by "INSTEAD OF".

It's the other way around. The 1st list (usually the longer one) is what 
has just been consumed and the second is what had been consumed before 
that for the same key (I maintain a ConcurrentHashMap of consumed 
entries in the test and execute: secondList = map.put(key, firstList) ....

In majority of cases, the consumed list is an incremental update of some 
previous version of the list (not necessarily direct descendant) 
consumed before that, but as said, I also observed the final window 
result before processor restart and after restart some previous version 
of non-final window aggregation for the same key.

May I also note that there is some "jitter" in the input timestamps 
because I'm trying to model a real usecase where there will be several 
input(s) to the system with only approximately synchronized clocks. The 
jitter is kept well below the TimeWindow grace period so there should be 
no events consumed by the processor that belong to windows that have 
already been flushed.

Regards, Peter

>
> Thanks for the report,
> -John
>
>
>
> On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <peter.levart@gmail.com> wrote:
>
>>
>> On 12/21/18 3:16 PM, Peter Levart wrote:
>>> I also see some results that are actual non-final window aggregations
>>> that precede the final aggregations. These non-final results are never
>>> emitted out of order (for example, no such non-final result would ever
>>> come after the final result for a particular key/window).
>> Absence of proof is not the proof of absence... And I have later
>> observed (using the DSL variant, not the custom Transformer) an
>> occurrence of a non-final result that was emited after restart of
>> streams processor while the final result for the same key/window had
>> been emitted before the restart:
>>
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550,
>> 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856
>> ...
>> ... restart ...
>> ...
>> [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550]
>> INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648
>>
>>
>> The app logic can not even rely on guarantee that results are ordered
>> then. This is really not usable until the bug is fixed.
>>
>> Regards, Peter
>>
>>


Mime
View raw message