kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Levart <peter.lev...@gmail.com>
Subject Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted
Date Tue, 25 Dec 2018 22:52:10 GMT
Hello Guozhang,

Just wanted to say that I have managed to come up with a different 
solution that doesn't have these problems.

Instead of doing the following:

     kStream
         .groupByKey()
         .windowedBy(timeWindows)
         .aggregate(
             initializer,
             aggregator,
             Materialized.with(keySerde, resultValueSerde)
         )
         .suppress(
Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
         )
         .toStream();

... I used a custom Transformer:

     kStream
         .transform(
             GroupByKeyWindowedTransformer.supplier(
                 timeWindows,
                 windowStoreName,
                 initializer,
                 aggregator,
             ),
             windowStoreName
         );

... with some help from a persistent WindowStore:

     // create store
     StoreBuilder<WindowStore<K, VR>> windowStoreBuilder =
         Stores.windowStoreBuilder(
             Stores.persistentWindowStore(
                 windowStoreName,
                 retentionPeriod,
                 windowSize,
                 false // don't allow duplicates
             ),
             keySerde,
             resultValueSerde
         );

     // register store
     builder.addStateStore(windowStoreBuilder);


Here's the GroupByKeyWindowedTransformer implementation:

/**
  * @author Peter Levart
  */
public class GroupByKeyWindowedTransformer<K, V, VR> implements 
Transformer<K, V, KeyValue<Windowed<K>, VR>> {

     public static <K, V, VR> TransformerSupplier<K, V, 
KeyValue<Windowed<K>, VR>> supplier(
         TimeWindows windows,
         String windowStoreName,
         Initializer<VR> initializer,
         Aggregator<? super K, ? super V, VR> aggregator
     ) {
         VR zeroRes = initializer.apply();
         return () -> new GroupByKeyWindowedTransformer<>(
             windows,
             windowStoreName,
             initializer,
             aggregator,
             zeroRes
         );
     }

     private final TimeWindows windows;
     private final String windowStoreName;
     private final Initializer<VR> initializer;
     private final Aggregator<? super K, ? super V, VR> aggregator;
     private final VR zeroRes;

     private GroupByKeyWindowedTransformer(
         TimeWindows windows,
         String windowStoreName,
         Initializer<VR> initializer,
         Aggregator<? super K, ? super V, VR> aggregator,
         VR zeroRes
     ) {
         this.windows = windows;
         this.windowStoreName = windowStoreName;
         this.initializer = initializer;
         this.aggregator = aggregator;
         this.zeroRes = zeroRes;
     }

     private ProcessorContext context;
     private WindowStore<K, VR> store;

     @SuppressWarnings("unchecked")
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
         this.store = (WindowStore<K, VR>) 
context.getStateStore(windowStoreName);
     }

     @Override
     public KeyValue<Windowed<K>, VR> transform(K key, V value) {
         long ts = context.timestamp();

         // aggregate into windows
         for (TimeWindow tw : windows.windowsFor(ts).values()) {
             VR res = store.fetch(key, tw.start());
             if (!zeroRes.equals(res)) { // not flushed yet (see below)
                 if (res == null) res = initializer.apply();
                 res = aggregator.apply(key, value, res);
                 assert !zeroRes.equals(res);
                 store.put(key, res, tw.start());
             }
         }

         // flush windows that are overdue (startTime < this event time 
minus window size minus grace period)
         try (WindowStoreIterator<VR> iter = store.fetch(key, 0L, ts - 
windows.size() - windows.gracePeriodMs() - 1L)) {
             while (iter.hasNext()) {
                 KeyValue<Long, VR> kv = iter.next();
                 if (kv.value != null && !zeroRes.equals(kv.value))
{
                     TimeWindow tw = new TimeWindow(kv.key, kv.key + 
windows.size());
                     context.forward(new Windowed<>(key, tw), kv.value);
                     store.put(key, zeroRes, kv.key); // mark slot 
flushed by writing zero result value
                 }
             }
         }

         return null;
     }

     @Override
     public void close() {
     }
}


With this sample code I don't even get duplicates in the output topic 
when the processor is restarted and I also don't get any non-final 
results of windowed aggregations.

The question is whether such transformer is correct (have I missed 
something?) and whether it is comparable to the DSL implementation above 
in terms of performance (will have to test).

Also, is it possible to make such stream processor redundant (tolerable 
to loss of local window store) and how?

I still hope that DSL variant could be made to work.

Regards, Peter


On 12/21/18 3:16 PM, Peter Levart wrote:
> Hello Guozhang,
>
> May I just add some more observations which might help you pin-point 
> the problem...
>
> When the process that runs the kafka streams processing threads is 
> restarted, I can see duplicates in the output topic. But that is 
> understandable for "at least once semantics" and I don't mind if there 
> are duplicates if they are duplicates of final results of window 
> aggregations. My logic is prepared for that. But I also see some 
> results that are actual non-final window aggregations that precede the 
> final aggregations. These non-final results are never emitted out of 
> order (for example, no such non-final result would ever come after the 
> final result for a particular key/window).
>
> For example, here are some log fragments of a sample consumption of 
> the output topic where I detect either duplicates or "incremental 
> updates" of some key/window and mark them with "INSTEAD OF" words. I 
> only show incremental updates here:
>
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> 
> [14, 272, 548, 172], sum: 138902
> [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> 
> [14, 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], 
> sum: 141164
>
> or:
>
> [pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> 
> [681, 116, 542, 543, 0, 0, 0, 0], sum: 143046
> [pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> 
> [681, 116, 542, 543, 0, 0, 0, 0, 0, 0, 0, 0] INSTEAD OF [681, 116, 
> 542, 543, 0, 0, 0, 0], sum: 143046
>
> The rule seems to be that almost always the non-final result precedes 
> immediately in the log the final result. I say almost, because I also 
> saw one occurrence of the following:
>
> [pool-1-thread-3] APP Consumed: [b@1545398878000/1545398880000] -> 
> [756, 454, 547, 300, 323], sum: 166729
> [pool-1-thread-3] APP Consumed: [b@1545398880000/1545398882000] -> 
> [193, 740, 660, 981], sum: 169303
> [pool-1-thread-3] APP Consumed: [b@1545398878000/1545398880000] -> 
> [756, 454, 547, 300, 323, 421, 378, 354, 0] INSTEAD OF [756, 454, 547, 
> 300, 323], sum: 170456
> [pool-1-thread-3] APP Consumed: [b@1545398880000/1545398882000] -> 
> [193, 740, 660, 981, 879, 209, 104, 0, 0, 0] INSTEAD OF [193, 740, 
> 660, 981], sum: 171648
>
> Here the incremental update of the key/window happened for two 
> consecutive 2 second windows in close succession and the results were 
> intermingled.
>
> What you see in the above log before the window start/end timestamps 
> is a Sting key which is used in groupByKey (a, b, c, d). The input and 
> output topics have 4 partitions and I use 4 streams processing threads...
>
> Hope this helps you find the problem.
>
> So could this be considered a bug? I don't know how this suppression 
> is supposed to work, but it seems that it does not use any persistent 
> storage for suppression buffer. So after the streams processing 
> process is restarted, it starts with a fresh buffer. What mechanism 
> are used to guarantee that in spite of that, the 
> suppress(untilWindowCloses) suppresses non-final results?
>
> Regards, Peter
>
> On 12/21/18 10:48 AM, Peter Levart wrote:
>> Hello Guozhang,
>>
>> Thank you for looking into this problem.
>>
>> I noticed that I have been using an internal class constructor and 
>> later discovered the right API to create the StrictBufferConfig 
>> implementations. But I'm afraid that using your proposed factory 
>> method won't change anything since its implementation is as follows:
>>
>>         static StrictBufferConfig unbounded() {
>>             return new StrictBufferConfigImpl();
>>         }
>>
>> ...it creates an instance of the same class as my sample code below, 
>> so the program behaves the same...
>>
>> What does this mean? Was your suggestion meant to rule-out any other 
>> possible causes and your suspicion still holds or did you suspect 
>> that I was not using suppression buffer of sufficient size?
>>
>> Regards, Peter
>>
>> On 12/21/18 1:58 AM, Guozhang Wang wrote:
>>> Hello Peter,
>>>
>>> Thanks for filing this report, I've looked into the source code and 
>>> I think
>>> I may spotted an edge case to your observations. To validate if my
>>> suspicion is correct, could you try modifying your DSL code a little 
>>> bit,
>>> to use a very large suppression buffer size --- BTW the
>>> StrictBufferConfigImpl is an internal class (you can tell by its 
>>> name) and
>>> are not recommend to use in your code. More specifically:
>>>
>>> Suppressed.untilWindowCloses(BufferConfig.unbounded())
>>>
>>> ------
>>>
>>> and see if this issue still exists?
>>>
>>>
>>> Guozhang
>>>
>>> On Wed, Dec 19, 2018 at 1:50 PM Peter Levart 
>>> <peter.levart@gmail.com> wrote:
>>>
>>>> I see the list processor managed to smash may beautifully formatted 
>>>> HTML
>>>> message. For that reason I'm re-sending the sample code snippet in 
>>>> plain
>>>> text mode...
>>>>
>>>>    Here's a sample kafka streams processor:
>>>>
>>>>           KStream<String, Val> input =
>>>>               builder
>>>>                   .stream(
>>>>                       inputTopic,
>>>>                       Consumed.with(Serdes.String(),
new Val.Serde())
>>>>                               .withTimestampExtractor((rec,
prevTs) 
>>>> -> {
>>>>                                   String
key = (String) rec.key();
>>>>                                   Val val
= (Val) rec.value();
>>>>                                   return
Math.max(val.getTimestamp(),
>>>> Math.max(0L, prevTs - 4000));
>>>>                               })
>>>>                   );
>>>>
>>>>           KStream<Windowed<String>, IntegerList> grouped
=
>>>>               input
>>>>                   .groupByKey()
>>>>                   .windowedBy(
>>>>                       TimeWindows.of(Duration.ofSeconds(1))
>>>> .advanceBy(Duration.ofSeconds(1))
>>>> .grace(Duration.ofSeconds(5))
>>>>                   )
>>>>                   .aggregate(
>>>>                       IntegerList::new,
>>>>                       (k, v, list) -> {
>>>>                           list.add(v.getValue());
>>>>                           return list;
>>>>                       },
>>>>                       Materialized.with(Serdes.String(),
new
>>>> IntegerList.Serde())
>>>>                   )
>>>>                   .suppress(
>>>>                       Suppressed.untilWindowCloses(new
>>>> StrictBufferConfigImpl())
>>>>                   )
>>>>                   .toStream();
>>>>
>>>>           grouped.to(
>>>>               outputTopic,
>>>>               Produced.with(new
>>>> SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
>>>>           );
>>>>
>>>>
>>>>
>>>> Regards, Peter
>>>>
>>>>
>>
>


Mime
View raw message