kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: kafka-streams, late data, tumbling windows aggregations and event time
Date Mon, 05 Mar 2018 17:33:03 GMT
Hello Dmitriy,

What you have observed is by design, and it maybe a bit confusing at first
place. Let me explain:

When you do a group-by aggregation like the above case, during the
value) -> ......)" stage Streams library will do a re-partitioning by
sending the original data stream into an internal repartition topic based
on the aggregation key defined in the "groupBy" function and fetch from
that topic again. This is similar to a shuffle phase in distributed
computing frameworks to make sure the down stream aggregations can be done
in parallel. When the "groupBy" operator sends the messages to this
repartition topic, it will set in the record metadata the extracted
timestamp from the payload, and hence for the downstream aggregation
operator to read from this repartition topic, it is OK to always use
the ExtractRecordMetadataTimestamp
to extract that timestamp and use the extracted value to determine which
window this record should fall into.

More details can be found in this JIRA:


So the record timestamp used during aggregation should be the same as the
one in the payload, if you do observe that is not the case, this is
unexpected. In that case could you share your complete code snippet,
especially how input stream "in" is defined, and your config properties
defined for us to investigate?


On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com>

> Good morning,
> we have simple use-case where we want to count number of events by each
> hour grouped by some fields from event itself.
> Our event timestamp is embedded into messages itself (json) and we using
> trivial custom timestamp extractor (which called and works as expected).
> What we facing is that there is always timestamp used that coming
> from ExtractRecordMetadataTimestamp when determining matching windows for
> event, inside KStreamWindowAggregate.process() and never value from our
> json timestamp extractor.
> Effectively it doesn't work correctly if we test on late data, e.g.
> timestamp in a message hour ago from now for instance. Topology always
> calculating matching hour bucket (window) using record timestamp, not
> payload.
> Is it expected behaviour ? Are we getting windowing wrong? Any settings or
> other tricks to accommodate our use-case?
> For reference our setup: brokers, kafka-stream and kafka-clients all of
> v1.0.0
> And here is code:
> KTable<Windowed<Tuple>, Long> summaries = in
>    .groupBy((key, value) -> ......)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();
> Thank you.

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message