kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case
Date Mon, 04 Jun 2018 17:45:44 GMT
Hello,

Your observation is correct, Kafka Streams by default will print continuous
updates to each window, instead of waiting for the "final" update for each
window.

There are some ongoing work to provide the functionality to allow users
specify sth. like "give me the final result for windowed aggregations" in
the DSL, it will probably come post 2.0 release.

Guozhang


On Mon, Jun 4, 2018 at 8:14 AM, EC Boost <ecboost2020@gmail.com> wrote:

> Logged the internal windows information:
>
> Window{start=1528043030000, end=1528043040000} key=t6  1
> Window{start=1528043040000, end=1528043050000} key=t1  2
> Window{start=1528043040000, end=1528043050000} key=t7  3
> Window{start=1528043040000, end=1528043050000} key=t5  4
> Window{start=1528043040000, end=1528043050000} key=t5  4,5
> Window{start=1528043050000, end=1528043060000} key=t6  6
> Window{start=1528043050000, end=1528043060000} key=t6  6,7
> Window{start=1528043050000, end=1528043060000} key=t4  8
> Window{start=1528043060000, end=1528043070000} key=t6  9
> Window{start=1528043060000, end=1528043070000} key=t7  10
> Window{start=1528043060000, end=1528043070000} key=t6  9,11
> Window{start=1528043070000, end=1528043080000} key=t5  12
> Window{start=1528043070000, end=1528043080000} key=t6  13
> Window{start=1528043070000, end=1528043080000} key=t4  14
> Window{start=1528043070000, end=1528043080000} key=t4  14,15
>
> ....
>
> It seems that Kafka Stream send all the  KTable changelog as output and
> that's probably why there's duplicate outputs for gap-less non-overlapping
> window.
>
> Is there any way to achieve real mini-batch-like style processing semantics
> using non-overlapping windows which means only the last  value will be sent
> as output not all the changelogs in the windows?
>
>
> On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2020@gmail.com> wrote:
>
> > Hello Everyone,
> >
> > I got duplicated results using kstreams for simple  windowed aggregation.
> >
> > The input event format is comma seperated:  "event_id,event_type" and I
> > need to aggregate them by event type.
> >
> > Following is the Kafka Stream processing logic:
> >
> > events
> >       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
> >       .groupByKey()
> >       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
> >       .aggregate(
> >         ArrayList::new,
> >         (type, id, eventList) -> {
> >           eventList.add(id);
> >           return eventList;
> >         },
> >         Materialized.with(stringSerde, arraySerde)
> >       )
> >       .toStream((k,v) -> k.key())
> >       .mapValues((v)-> String.join(",", v))
> >       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
> >
> >
> > I produced the input messages using the following snippet:
> >
> > require "kafka"
> >
> > kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
> >
> > f = File.open("events.txt")
> > f.each_line { |l|
> >   puts l
> >   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
> >   sleep(3)
> > }
> >
> >
> >
> > Messages in events.txt is the following ( format : "event_id,event_type"
> > and event_id is unique )  :
> >
> > Input
> >
> > 1,t6
> > 2,t1
> > 3,t7
> > 4,t5
> > 5,t5
> > 6,t6
> > 7,t6
> > 8,t4
> > 9,t6
> > 10,t7
> > 11,t6
> > 12,t5
> > 13,t6
> > 14,t4
> > 15,t4
> > 16,t2
> > 17,t7
> > 18,t6
> > 19,t3
> > 20,t7
> > 21,t1
> > 22,t5
> > 23,t5
> > 24,t6
> > 25,t6
> > 26,t4
> > 27,t4
> > 28,t3
> > 29,t2
> > 30,t5
> > 31,t1
> > 32,t1
> > 33,t1
> > 34,t1
> > 35,t2
> > 36,t4
> > 37,t3
> > 38,t3
> > 39,t6
> > 40,t6
> > 41,t1
> > 42,t4
> > 43,t4
> > 44,t6
> > 45,t6
> > 46,t7
> > 47,t7
> > 48,t3
> > 49,t1
> > 50,t6
> > 51,t1
> > 52,t4
> > 53,t6
> > 54,t7
> > 55,t1
> > 56,t1
> > 57,t1
> > 58,t5
> > 59,t6
> > 60,t7
> > 61,t6
> > 62,t4
> > 63,t5
> > 64,t1
> > 65,t3
> > 66,t1
> > 67,t3
> > 68,t3
> > 69,t5
> > 70,t1
> > 71,t6
> > 72,t5
> > 73,t6
> > 74,t1
> > 75,t7
> > 76,t5
> > 77,t3
> > 78,t1
> > 79,t4
> > 80,t3
> > 81,t6
> > 82,t2
> > 83,t6
> > 84,t2
> > 85,t4
> > 86,t7
> > 87,t4
> > 88,t6
> > 89,t5
> > 90,t6
> > 91,t4
> > 92,t3
> > 93,t4
> > 94,t6
> > 95,t2
> > 96,t2
> > 97,t7
> > 98,t4
> > 99,t3
> > 100,t3
> >
> > <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81
> c2#output>
> >
> > But got the following output with duplicate event_ids between windows :
> >
> > Output
> >
> > t6    1
> > t1    2
> > t7    3
> > t5    4
> > t5    4,5
> > t6    6
> > t6    6,7
> > t4    8
> > t6    9
> > t7    10
> > t6    9,11
> > t5    12
> > t6    13
> > t4    14
> > t4    14,15
> > t2    16
> > t7    17
> > t6    18
> > t3    19
> > t7    20
> > t1    21
> > t5    22
> > t5    22,23
> > t6    24
> > t6    24,25
> > t4    26
> > t4    26,27
> > t3    28
> > t2    29
> > t5    30
> > t1    31
> > t1    32
> > t1    32,33
> > t1    32,33,34
> > t2    35
> > t4    36
> > t3    37
> > t3    37,38
> > t6    39
> > t6    39,40
> > t1    41
> > t4    42
> > t4    42,43
> > t6    44
> > t6    44,45
> > t7    46
> > t7    46,47
> > t3    48
> > t1    49
> > t6    50
> > t1    49,51
> > t4    52
> > t6    53
> > t7    54
> > t1    55
> > t1    56
> > t1    56,57
> > t5    58
> > t6    59
> > t7    60
> > t6    59,61
> > t4    62
> > t5    63
> > t1    64
> > t3    65
> > t1    66
> > t3    67
> > t3    67,68
> > t5    69
> > t1    70
> > t6    71
> > t5    72
> > t6    73
> > t1    74
> > t7    75
> > t5    76
> > t3    77
> > t1    78
> > t4    79
> > t3    80
> > t6    81
> > t2    82
> > t6    83
> > t2    82,84
> > t4    85
> > t7    86
> > t4    87
> > t6    88
> > t5    89
> > t6    90
> > t4    91
> > t3    92
> > t4    93
> > t6    94
> > t2    95
> > t2    96
> > t7    97
> > t4    98
> > t3    99
> > t3    99,100
> >
> >
> >
> > Since I am using non-overlapping gap-less windows in kstream processing
> > dsl, the correct ouput should NOT contain duplicate event ids between
> > windows.  Any ideas why the duplicates ?   ( Link for the debug project:
> > https://github.com/westec/ks-aggregate-debug )
> >
> > Appreciate for your help!
> >
> > Regards,
> > EC
> >
> >
> >
> >
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message