kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrienne Kole <adrienneko...@gmail.com>
Subject Kafka Streams windows strange behavior
Date Thu, 08 Sep 2016 15:19:59 GMT
Hi,

I am trying to implement simple scenario on streams library of kafka.

I insert data to kafka topic 1 tuple/second.
Streams library is connected to particular topic and what it does is:
            1. construct 8 second windows with 4 second sliding time,
            2. sum values of tuples (price field) and set count field
(possibly to find average)
            3. output the results

So one would expect that I will have more or less aggregation results once
per 4 seconds.
However, I get the 2 output per second.

After analyzing a bit, I can conclude that for each tuple streams library
computes aggregation result  with each sub-window aggregator (as each tuple
can be part of several windows), and directly sends to downstream. It keeps
aggregated object in sub-windows so when new object comes, it is updated,
to make it non-blocking.

However, I see that the result of window is not updated w.r.t. previous
values. That is, if I set a count field for aggregated object, I see
count=1 for all of them.

Here is my code:



        public static void main(String[] args) throws Exception {

            if (args == null || args.length != 1){
                throw new Exception("commandline argument is needed: path
to configuration file");
            }
            String confPath = args[0];
            YamlReader reader = new YamlReader(new FileReader(confPath));
            Object object = reader.read();
            Map conf = (Map)object;

            ArrayList<String> bootstrapServersArr = (ArrayList<String>)
conf.get("kafka.brokers");
            Integer kafkaPort = new Integer
(conf.get("kafka.port").toString());
            String bootstapServers  =
conf.get("bootstrap.servers").toString();
            String bootstapServers  = conf.get("zoo.servers").toString();

            String kafkaTopic = conf.get("kafka.topic").toString();
            String dataGeneratorHost =
InetAddress.getLocalHost().getHostName();
            Integer dataGeneratorPort = new
Integer(conf.get("datasourcesocket.port").toString());


            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kafka-benchmark");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstapServers);
            props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
zookeeperServers);
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


            int slideWindowLength = new
Integer(conf.get("slidingwindow.length").toString());        // 8000 ms
            int slideWindowSlide = new
Integer(conf.get("slidingwindow.slide").toString());            // 4000 ms

            KStreamBuilder builder = new KStreamBuilder();
            KStream<Long, String> source =
builder.stream(Serdes.Long(),Serdes.String(), kafkaTopic);

            KStream<Long, Double> tsAppended = source.map(new
KeyValueMapper<Long, String, KeyValue<Long, Double>>() {
                @Override
                public KeyValue<Long, Double > apply(Long aLong, String s) {
                    JSONObject obj = new JSONObject(s);
                    Double val =  obj.getJSONObject("m").getDouble("price");
                    Long id = obj.getJSONObject("s").getLong("aid1");

                    return new KeyValue<Long, Double>(id val);
                }
            } );

            KTable<Windowed<Long>, AvgValue> windowAgg =
tsAppended.aggregateByKey(
                    new Initializer<AvgValue>() {
                        @Override
                        public AvgValue apply() {
                            return new AvgValue(0, 0D, 0L);
                        }
                    },
                    new AvgAggregator<Long, Double, AvgValue>(),
                    TimeWindows.of("timewindow",
slideWindowLength).advanceBy(slideWindowSlide),
                    Serdes.Long(), new AvgSerde()
            );

            KTable<Windowed<Long>, AvgValue> windowAggResult  =
windowAgg.mapValues((v)-> {
               * System.out.println(v.getSum() + " - " + v.getCount() );;
return  v;*}   )   ;


            KafkaStreams streams = new KafkaStreams(builder, props);
            streams.start();
        }


    public static class AvgAggregator<K, V, T>  implements Aggregator<Long,
Double, AvgValue> {
        @Override
        public AvgValue apply(Long id, Double price, AvgValue avg) {
            avg.setSum(avg.getSum() + price);
            avg.setCount(avg.getCount() + 1);
            return avg;
        }
    }

}



I output each tuple where it was created and check with the outputted value
in kafka-streams.

price - count

29.78169        //  this is generator
29.78169 - 1   // this is kafka-streams
29.78169 - 1    // this is kafka-streams
 30.767307        //  this is generator
30.767307 - 1   // this is kafka-streams
30.767307 - 1   // this is kafka-streams
 8.034571        //  this is generator
8.034571 - 1   // this is kafka-streams
8.034571 - 1   // this is kafka-streams
 79.36154        //  this is generator
79.36154 - 1   // this is kafka-streams
79.36154 - 1   // this is kafka-streams
 14.394047        //  this is generator
14.394047 - 1   // this is kafka-streams
14.394047 - 1   // this is kafka-streams
 70.00672        //  this is generator
70.00672 - 1   // this is kafka-streams
70.00672 - 1   // this is kafka-streams
 58.836906        //  this is generator
58.836906 - 1   // this is kafka-streams
58.836906 - 1   // this is kafka-streams
 20.771767         //  this is generator
20.771767 - 1   // this is kafka-streams
20.771767 - 1   // this is kafka-streams
 64.35614         //  this is generator
64.35614 - 1   // this is kafka-streams
64.35614 - 1   // this is kafka-streams



So the tuples do not get updated aggregate values as they pass through
window.

Am I doing something wrong, or what can be the reason for this strange
behavior?


Cheers
Adrienne

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message