kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 杰 杨 <funk...@live.com>
Subject kafka steams with TimeWindows ,incorrect result
Date Fri, 09 Mar 2018 13:36:17 GMT
Hi:
I used TimeWindow for aggregate data in kafka.

this is code snippet ;

  view.flatMap(new MultipleKeyValueMapper(client)).groupByKey(Serialized.with(Serdes.String(),
                Serdes.serdeFrom(new CountInfoSerializer(), new CountInfoDeserializer())))
        .windowedBy(TimeWindows.of(60000)).reduce(new Reducer<CountInfo>() {
            @Override
            public CountInfo apply(CountInfo value1, CountInfo value2) {
                return new CountInfo(value1.start + value2.start, value1.active + value2.active,
value1.fresh + value2.fresh);
            }
        }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, String>()
{
            @Override
            public String apply(Windowed<String> key, CountInfo value) {
                return key.key();
            }
        }).print(Printed.toSysOut());

        KafkaStreams streams = new KafkaStreams(builder.build(), KStreamReducer.getConf());
        streams.start();

and I test 30000 data in kafka .
and I print key value .


[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000],
CountInfo{start=12179, active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=12179,
active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000],
CountInfo{start=30000, active=30000, fresh=30000}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=30000,
active=30000, fresh=30000}
why in one window duration will be print two result but not one result ?

________________________________
funkyyj@live.com
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message