kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From EC Boost <ecboost2...@gmail.com>
Subject Re: Kafka Streams Produced Wrong (duplicated) Results with Simple Windowed Aggregation Case
Date Mon, 04 Jun 2018 15:14:51 GMT
Logged the internal windows information:

Window{start=1528043030000, end=1528043040000} key=t6  1
Window{start=1528043040000, end=1528043050000} key=t1  2
Window{start=1528043040000, end=1528043050000} key=t7  3
Window{start=1528043040000, end=1528043050000} key=t5  4
Window{start=1528043040000, end=1528043050000} key=t5  4,5
Window{start=1528043050000, end=1528043060000} key=t6  6
Window{start=1528043050000, end=1528043060000} key=t6  6,7
Window{start=1528043050000, end=1528043060000} key=t4  8
Window{start=1528043060000, end=1528043070000} key=t6  9
Window{start=1528043060000, end=1528043070000} key=t7  10
Window{start=1528043060000, end=1528043070000} key=t6  9,11
Window{start=1528043070000, end=1528043080000} key=t5  12
Window{start=1528043070000, end=1528043080000} key=t6  13
Window{start=1528043070000, end=1528043080000} key=t4  14
Window{start=1528043070000, end=1528043080000} key=t4  14,15

....

It seems that Kafka Stream send all the  KTable changelog as output and
that's probably why there's duplicate outputs for gap-less non-overlapping
window.

Is there any way to achieve real mini-batch-like style processing semantics
using non-overlapping windows which means only the last  value will be sent
as output not all the changelogs in the windows?


On Mon, Jun 4, 2018 at 1:25 AM, EC Boost <ecboost2020@gmail.com> wrote:

> Hello Everyone,
>
> I got duplicated results using kstreams for simple  windowed aggregation.
>
> The input event format is comma seperated:  "event_id,event_type" and I
> need to aggregate them by event type.
>
> Following is the Kafka Stream processing logic:
>
> events
>       .map((k, v) -> KeyValue.pair(v.split(",")[1], v.split(",")[0]))
>       .groupByKey()
>       .windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(10)))
>       .aggregate(
>         ArrayList::new,
>         (type, id, eventList) -> {
>           eventList.add(id);
>           return eventList;
>         },
>         Materialized.with(stringSerde, arraySerde)
>       )
>       .toStream((k,v) -> k.key())
>       .mapValues((v)-> String.join(",", v))
>       .to("ks-debug-output", Produced.with(stringSerde, stringSerde));
>
>
> I produced the input messages using the following snippet:
>
> require "kafka"
>
> kafka = Kafka.new(["localhost:9092"], client_id: "event-producer")
>
> f = File.open("events.txt")
> f.each_line { |l|
>   puts l
>   kafka.deliver_message("#{l.strip}", topic: "ks-debug-input")
>   sleep(3)
> }
>
>
>
> Messages in events.txt is the following ( format : "event_id,event_type"
> and event_id is unique )  :
>
> Input
>
> 1,t6
> 2,t1
> 3,t7
> 4,t5
> 5,t5
> 6,t6
> 7,t6
> 8,t4
> 9,t6
> 10,t7
> 11,t6
> 12,t5
> 13,t6
> 14,t4
> 15,t4
> 16,t2
> 17,t7
> 18,t6
> 19,t3
> 20,t7
> 21,t1
> 22,t5
> 23,t5
> 24,t6
> 25,t6
> 26,t4
> 27,t4
> 28,t3
> 29,t2
> 30,t5
> 31,t1
> 32,t1
> 33,t1
> 34,t1
> 35,t2
> 36,t4
> 37,t3
> 38,t3
> 39,t6
> 40,t6
> 41,t1
> 42,t4
> 43,t4
> 44,t6
> 45,t6
> 46,t7
> 47,t7
> 48,t3
> 49,t1
> 50,t6
> 51,t1
> 52,t4
> 53,t6
> 54,t7
> 55,t1
> 56,t1
> 57,t1
> 58,t5
> 59,t6
> 60,t7
> 61,t6
> 62,t4
> 63,t5
> 64,t1
> 65,t3
> 66,t1
> 67,t3
> 68,t3
> 69,t5
> 70,t1
> 71,t6
> 72,t5
> 73,t6
> 74,t1
> 75,t7
> 76,t5
> 77,t3
> 78,t1
> 79,t4
> 80,t3
> 81,t6
> 82,t2
> 83,t6
> 84,t2
> 85,t4
> 86,t7
> 87,t4
> 88,t6
> 89,t5
> 90,t6
> 91,t4
> 92,t3
> 93,t4
> 94,t6
> 95,t2
> 96,t2
> 97,t7
> 98,t4
> 99,t3
> 100,t3
>
> <https://gist.github.com/stonegao/087fc0a06fc81177b452a651c16e81c2#output>
>
> But got the following output with duplicate event_ids between windows :
>
> Output
>
> t6	1
> t1	2
> t7	3
> t5	4
> t5	4,5
> t6	6
> t6	6,7
> t4	8
> t6	9
> t7	10
> t6	9,11
> t5	12
> t6	13
> t4	14
> t4	14,15
> t2	16
> t7	17
> t6	18
> t3	19
> t7	20
> t1	21
> t5	22
> t5	22,23
> t6	24
> t6	24,25
> t4	26
> t4	26,27
> t3	28
> t2	29
> t5	30
> t1	31
> t1	32
> t1	32,33
> t1	32,33,34
> t2	35
> t4	36
> t3	37
> t3	37,38
> t6	39
> t6	39,40
> t1	41
> t4	42
> t4	42,43
> t6	44
> t6	44,45
> t7	46
> t7	46,47
> t3	48
> t1	49
> t6	50
> t1	49,51
> t4	52
> t6	53
> t7	54
> t1	55
> t1	56
> t1	56,57
> t5	58
> t6	59
> t7	60
> t6	59,61
> t4	62
> t5	63
> t1	64
> t3	65
> t1	66
> t3	67
> t3	67,68
> t5	69
> t1	70
> t6	71
> t5	72
> t6	73
> t1	74
> t7	75
> t5	76
> t3	77
> t1	78
> t4	79
> t3	80
> t6	81
> t2	82
> t6	83
> t2	82,84
> t4	85
> t7	86
> t4	87
> t6	88
> t5	89
> t6	90
> t4	91
> t3	92
> t4	93
> t6	94
> t2	95
> t2	96
> t7	97
> t4	98
> t3	99
> t3	99,100
>
>
>
> Since I am using non-overlapping gap-less windows in kstream processing
> dsl, the correct ouput should NOT contain duplicate event ids between
> windows.  Any ideas why the duplicates ?   ( Link for the debug project:
> https://github.com/westec/ks-aggregate-debug )
>
> Appreciate for your help!
>
> Regards,
> EC
>
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message