kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Whey does the window final result is not emitted after the window has elapsed?
Date Wed, 02 Jan 2019 14:27:40 GMT
> After some time, the window closes.

This is not correct. Windows are based on event-time, and because no new
input record is processed, the window is not closed. That is the reason
why you don't get any output. Only a new input record can advance
"stream time" and close the window.

In practice, when data flows continuously, this should not be a issue
though.


-Matthias

On 12/31/18 8:22 AM, jingguo yao wrote:
> Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> the window final result is not emitted after the window has elapsed?"
> 
> I have browsed the Kafka source code and found the cause of the
> mentioned behaviour.
> 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> has the following code:
> 
> @Override
> public void process(final K key, final Change<V> value) {
>   buffer(key, value);
>   enforceConstraints();
> }
> 
> enforceConstraints method invocation emits window results under some
> conditions in the above code.
> 
> After process method processes the first record, the window begins.
> After some time, the window closes. But before process is invoked
> again (triggered by receiving another record), there is no chance to
> emit the window result.
> 
> Are there some configuration options to emit the window result without
> waiting for another record to arrive?
> 
> And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> 5.1.0.
> 
> jingguo yao <yaojingguo@gmail.com> 于2018年12月30日周日 下午10:53写道:
>>
>> I followed [1] to code a simple example to try suppress operator.
>>
>> Here is the simple code:
>>
>> final Serde<String> stringSerde = Serdes.String();
>> final StreamsBuilder builder = new StreamsBuilder();
>> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
>> Serdes.String()))
>>   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
>>   .groupBy((key, word) -> word,
>> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
>>   .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
>>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
>>   .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>   .toStream()
>>   .foreach(
>>       (key, value) -> {
>>         System.out.printf("key: %s, value: %d\n", key, value);
>>       });
>>
>> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
>> send one text line "hello", nothing will be printed even I wait for
>> more than 3 seconds (the window size). Since the time longer than the
>> window size has elapsed, I think that key and value should be printed.
>>
>> But if I send another text line "hello", key and value will be
>> printed.
>>
>> Can anyone explain this behavior? I have browsed the Kafka
>> documentation. But I can't find an explanation.
>>
>> [1] http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
>>
>>
>> --
>> Jingguo
> 
> 
> 


Mime
View raw message