kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jingguo yao <yaojing...@gmail.com>
Subject Re: Whey does the window final result is not emitted after the window has elapsed?
Date Sat, 05 Jan 2019 05:01:31 GMT
Guozhang:

Thanks for your kind help.

Guozhang Wang <wangguoz@gmail.com> 于2019年1月5日周六 上午3:28写道:
>
> Thanks for the detailed description.
>
> 1) Yes the stream time is advancable by any records.
> 2) Given your description, another way to work around the situation is to
> let your class send a final record with timestamp set as the class-end-time
> plus a small delta (think of it as a sentinel "tick" record just for
> advancing the clock), and therefore the stream time can be advanced still
> by the end of the last class.
>
> Guozhang
>
>
> On Thu, Jan 3, 2019 at 11:43 PM jingguo yao <yaojingguo@gmail.com> wrote:
>
> > Guozhang:
> >
> > Yes, my case is a  real production scenario.
> >
> > I am monitoring on-line live-broadcast classes. I need to have a
> > summary of each 5-minute period for one class. Each class has a
> > classroom Id. I report class activity data to a Kafka topic. Classroom
> > id is used to partition these data. Here are my Kafka streaming steps:
> >
> > 1. groupByKey: with classroom id.
> > 2. windowedBy: with a 5-minute tumbling window.
> > 3. aggregate: do a summary over the window.
> > 4. Send the aggregate result to other external systems.
> >
> > Each class has about one-hour length. After the class ends, no data
> > will be sent for the class. At each night, there will be dozens of
> > classes.
> >
> > I have done more tests with my code. It seems that a new record to
> > advance the window for class A does not need to be a new record for
> > class A. A new record for any class can advance the window. Is this
> > behavior guaranteed by Kafka streams? Even if this behavior is
> > guaranteed, the last window for the last class at one night can't be
> > delivered until a new record arrives for a new class tomorrow night.
> >
> >
> > Guozhang Wang <wangguoz@gmail.com> 于2019年1月4日周五 上午2:59写道:
> >
> > >
> > > Hello Jingguo,
> > >
> > > Is this case (i.e. you only have data over 57 minutes, and no new data
> > > afterwards) a real production scenario? In stream processing we usually
> > > expect the input data stream in continuously, and I'm curious to learn
> > your
> > > use case better and why it would not have further data after a period of
> > > time.
> > >
> > > ATM, if you want to really walk around this issue you can use system-time
> > > based `punctuate` call, which is a lower-level functionality than the
> > > `suppress` call in DSL.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <yaojingguo@gmail.com> wrote:
> > >
> > > > Hi, Matthias
> > > >
> > > > I am doing a 5-minute tumbling window analysis over a 57-minute data
> > > > flow. And I want only one final result per window. So I need suppress.
> > > > The 57-minute period can be divided into about 12 windows. The results
> > > > of the first 11 windows can be delivered downstream. But the final
> > > > result for the last 2-minute window can never be delivered downstream
> > > > since there is no a new record to advance the window.
> > > >
> > > > Is there any workaround to deliver the result for the last window in
> > > > my situation?
> > > >
> > > > -- Jingguo
> > > >
> > > > Matthias J. Sax <matthias@confluent.io> 于2019年1月2日周三
下午10:27写道:
> > > > >
> > > > > > After some time, the window closes.
> > > > >
> > > > > This is not correct. Windows are based on event-time, and because
no
> > new
> > > > > input record is processed, the window is not closed. That is the
> > reason
> > > > > why you don't get any output. Only a new input record can advance
> > > > > "stream time" and close the window.
> > > > >
> > > > > In practice, when data flows continuously, this should not be a issue
> > > > > though.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/31/18 8:22 AM, jingguo yao wrote:
> > > > > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey
does
> > > > > > the window final result is not emitted after the window has
> > elapsed?"
> > > > > >
> > > > > > I have browsed the Kafka source code and found the cause of
the
> > > > > > mentioned behaviour.
> > > > > >
> > > > > >
> > > >
> > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > > > > > has the following code:
> > > > > >
> > > > > > @Override
> > > > > > public void process(final K key, final Change<V> value)
{
> > > > > >   buffer(key, value);
> > > > > >   enforceConstraints();
> > > > > > }
> > > > > >
> > > > > > enforceConstraints method invocation emits window results under
> > some
> > > > > > conditions in the above code.
> > > > > >
> > > > > > After process method processes the first record, the window
begins.
> > > > > > After some time, the window closes. But before process is invoked
> > > > > > again (triggered by receiving another record), there is no chance
> > to
> > > > > > emit the window result.
> > > > > >
> > > > > > Are there some configuration options to emit the window result
> > without
> > > > > > waiting for another record to arrive?
> > > > > >
> > > > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > > > > > 5.1.0.
> > > > > >
> > > > > > jingguo yao <yaojingguo@gmail.com> 于2018年12月30日周日
下午10:53写道:
> > > > > >>
> > > > > >> I followed [1] to code a simple example to try suppress
operator.
> > > > > >>
> > > > > >> Here is the simple code:
> > > > > >>
> > > > > >> final Serde<String> stringSerde = Serdes.String();
> > > > > >> final StreamsBuilder builder = new StreamsBuilder();
> > > > > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> > > > > >> Serdes.String()))
> > > > > >>   .flatMapValues(value ->
> > > > Arrays.asList(value.toLowerCase().split("\\W+")))
> > > > > >>   .groupBy((key, word) -> word,
> > > > > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> > > > > >>
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> > > > > >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> > > > > >>
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > >>   .toStream()
> > > > > >>   .foreach(
> > > > > >>       (key, value) -> {
> > > > > >>         System.out.printf("key: %s, value: %d\n", key, value);
> > > > > >>       });
> > > > > >>
> > > > > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering
to
> > 0. If
> > > > I
> > > > > >> send one text line "hello", nothing will be printed even
I wait
> > for
> > > > > >> more than 3 seconds (the window size). Since the time longer
than
> > the
> > > > > >> window size has elapsed, I think that key and value should
be
> > printed.
> > > > > >>
> > > > > >> But if I send another text line "hello", key and value will
be
> > > > > >> printed.
> > > > > >>
> > > > > >> Can anyone explain this behavior? I have browsed the Kafka
> > > > > >> documentation. But I can't find an explanation.
> > > > > >>
> > > > > >> [1]
> > > >
> > http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Jingguo
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Jingguo
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
> >
> >
> > --
> > Jingguo
> >
>
>
> --
> -- Guozhang



-- 
Jingguo

Mime
View raw message