kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com>
Subject Re: Window aggregation skipping some data
Date Wed, 17 Jul 2019 16:25:36 GMT
Seems that completely removing the grace period fixes the problem,

is it expected? Is the grace period per key or global?

--
Alessandro Tagliapietra

On Wed, Jul 17, 2019 at 12:07 AM Alessandro Tagliapietra <
tagliapietra.alessandro@gmail.com> wrote:

> I've added a reproduction repo here if someone wants to have a look at a
> full working example
>
> https://github.com/alex88/kafka-error-repro
>
> you can see at the top of WindowTest.java the messages it sends and
> underneath you have the part where it generates the sequence and the window
>
> I've also included the window only part
> https://github.com/alex88/kafka-error-repro/blob/window_only/src/main/java/myapps/WindowTest.java
since
> this stil happens even with only a window.
> The aggregate function is called for all records from a key but not all
> the records for the second.
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Jul 16, 2019 at 10:36 PM Alessandro Tagliapietra <
> tagliapietra.alessandro@gmail.com> wrote:
>
>> Actually suppress doesn't matter, it happens later in the code, I've also
>> tried to remove that and add a grace period to the window function but the
>> issue persists.
>>
>> --
>> Alessandro Tagliapietra
>>
>> On Tue, Jul 16, 2019 at 10:17 PM Alessandro Tagliapietra <
>> tagliapietra.alessandro@gmail.com> wrote:
>>
>>> transformValuesHello everyone,
>>>
>>> I've an issue trying to window data. I'm sending on a topic the same
>>> exact data for 2 different keys, however key number 1 is acting properly,
>>> key number 2 isn't.
>>>
>>> As you can see here
>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-gistfile1-txt
I'm
>>> sending 4 messages for each key,
>>> the first 3 messages of each key all have the values to 0, the 4th has
>>> different values.
>>>
>>>
>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-code-java
this
>>> is the code I have (there are more things before and after but I've
>>> narrowed down the issue here).
>>>
>>> https://gist.github.com/alex88/dd68a0ce4ae46c37edfc7492b6e16bc8#file-output-log
this
>>> is the output i see in the console.
>>>
>>> Basically what's happening is, in the first piece of code I just use a
>>> store + transformValues to generate from a stream of data a stream of
>>> pairs, so e.g. 1, 2, 3, 4 becomes (1,2), (2,3), (3,4) and so on.
>>> After that stream I log all the sensor + values I get (in the foreach)
>>> and out of the 8 total messages, which get flatMapped into 16 single
>>> messages I send I see 16 logs, which means that all messages
>>> that I've received went through that part.
>>>
>>> Now the problem comes after, first I use groupByKey to be able to then
>>> window each key independently, then I use my aggregate function, it's as
>>> this point that I don't see all the messages that I've expected to receive
>>> from the previous step.
>>> As you can see from the log key 1 has its log called 8 times (4 messages
>>> each with 2 metrics flatmapped to 8 messages), key 2 instead has that log
>>> called only 3 times.
>>> At first I thought maybe it was skipping some values because of the
>>> suppress, but both keys have the same values so it should either suppress
>>> both or none, plus all messages are ordered and the timestamp extractor
>>> reads the timestamp from the message.
>>>
>>> Anyone has any idea on what could be the problem?
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message