kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Levart <peter.lev...@gmail.com>
Subject KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Date Wed, 19 Dec 2018 21:39:25 GMT
Hello,

I'm trying to use kafka streams to aggregate some time series data using 
1 second tumbling time windows. The data is ordered approximately by 
timestamp with some "jitter" which I'm limiting at the input by a custom 
TimestampExtractor that moves events into the future if they come in to 
late guaranteeing that the timestamp of each event never jumps back for 
more that 4 seconds according to previous most recent event timestamp. I 
then give the tumbling windows a grace period of 5 seconds...

Here's a sample kafka streams processor:

KStream<String, Val> input 
=builder.stream(inputTopic,Consumed.with(Serdes.String(), new 
Val.Serde()).withTimestampExtractor((rec, prevTs) -> {String key = 
(String) rec.key();Val val = (Val) rec.value();return 
Math.max(val.getTimestamp(), Math.max(0L, prevTs - 
4000));}));KStream<Windowed<String>, IntegerList> grouped 
=input.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(1)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5))).aggregate(IntegerList::new,(k,

v, list) -> {list.add(v.getValue());return 
list;},Materialized.with(Serdes.String(), new 
IntegerList.Serde())).suppress(Suppressed.untilWindowCloses(new 
StrictBufferConfigImpl())).toStream();grouped.to(outputTopic,Produced.with(new 
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()));


I'm using KTable.suppress with Suppressed.untilWindowCloses to suppress 
all but final versions of aggregations. This works as expected and I 
only get one final result per grouping key and window instance in the 
output topic. But this only works as expected and advertised until I 
restart the karfka streams process during the course of aggregating the 
events. After restart, I can see some non-final versions of aggregations 
in the output topic followed by final versions. So the guarantee 
advertised by Suppressed.untilWindowCloses() which says:

/"This option is suitable for use cases in which the business logic 
requires a hard guarantee that only the final result is propagated."/

...is only true when the kafka streams process is not restarted. Is this 
expected behavior or maybe a bug?

Thanks,

Peter Levart


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message