kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杰 杨 <funk...@live.com>
Subject Re: Re: kafka steams with TimeWindows ,incorrect result
Date Sat, 10 Mar 2018 05:45:24 GMT
thx for your reply!
I see that it is designed to operate on an infinite, unbounded stream of data.
now I want to process for  unbounded stream but divided by time interval .
so what can I do for doing this ?

________________________________
funkyyj@live.com

From: Guozhang Wang<mailto:wangguoz@gmail.com>
Date: 2018-03-10 02:50
To: users<mailto:users@kafka.apache.org>
Subject: Re: kafka steams with TimeWindows ,incorrect result
Hi Jie,

This is by design of Kafka Streams, please read this doc for more details
(search for "outputs of the Wordcount application is actually a continuous
stream of updates"):

https://kafka.apache.org/0110/documentation/streams/quickstart

Note this semantics applies for both windowed and un-windowed tables.


Guozhang

On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <funkyyj@live.com> wrote:

> Hi:
> I used TimeWindow for aggregate data in kafka.
>
> this is code snippet ;
>
>   view.flatMap(new MultipleKeyValueMapper(client)
> ).groupByKey(Serialized.with(Serdes.String(),
>                 Serdes.serdeFrom(new CountInfoSerializer(), new
> CountInfoDeserializer())))
>         .windowedBy(TimeWindows.of(60000)).reduce(new
> Reducer<CountInfo>() {
>             @Override
>             public CountInfo apply(CountInfo value1, CountInfo value2) {
>                 return new CountInfo(value1.start + value2.start,
> value1.active + value2.active, value1.fresh + value2.fresh);
>             }
>         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> String>() {
>             @Override
>             public String apply(Windowed<String> key, CountInfo value) {
>                 return key.key();
>             }
>         }).print(Printed.toSysOut());
>
>         KafkaStreams streams = new KafkaStreams(builder.build(),
> KStreamReducer.getConf());
>         streams.start();
>
> and I test 30000 data in kafka .
> and I print key value .
>
>
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=12179, active=12179, fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> fresh=30000}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=30000, active=30000, fresh=30000}
> why in one window duration will be print two result but not one result ?
>
> ________________________________
> funkyyj@live.com
>



--
-- Guozhang
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message