kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: kafka-streams, late data, tumbling windows aggregations and event time
Date Mon, 05 Mar 2018 20:18:26 GMT
Hello Dmitriy,

In your case, you can override this config to CreateTime only for the
internal topics created by Streams, this is documented in

https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/StreamsConfig.html#TOPIC_PREFIX


We are also discussing to always override the log.message.timestamp.type
config for internal topics to CreateTime, I vaguely remember there is a
JIRA open for it in case you are interested in contributing to Streams
library.

Guozhang


On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <dvsekhvalnov@gmail.com
> wrote:

> Which effectively means given scenario is not working with LogAppendTime,
> correct? Because all internal re-partition topics will always contain "now"
> instead of real timestamp from original payload message?
>
> Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
> of stuff will NOT work correctly using
> built-in ExtractRecordMetadataTimestamp ?
>
> On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > If broker configures log.message.timestamp.type=LogAppendTime
> universally,
> > it will ignore whatever timestamp set in the message metadata and
> override
> > it with the append time. So when the messages are fetched by downstream
> > processors which always use the metadata timestamp extractor, it will get
> > the append timestamp set by brokers.
> >
> >
> > Guozhang
> >
> > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > dvsekhvalnov@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > interesting, will same logic applies (internal topic rewrite) for
> brokers
> > > configured with:
> > >   log.message.timestamp.type=LogAppendTime
> > >
> > > ?
> > >
> > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > What you have observed is by design, and it maybe a bit confusing at
> > > first
> > > > place. Let me explain:
> > > >
> > > > When you do a group-by aggregation like the above case, during the
> > > > "groupBy((key,
> > > > value) -> ......)" stage Streams library will do a re-partitioning
by
> > > > sending the original data stream into an internal repartition topic
> > based
> > > > on the aggregation key defined in the "groupBy" function and fetch
> from
> > > > that topic again. This is similar to a shuffle phase in distributed
> > > > computing frameworks to make sure the down stream aggregations can be
> > > done
> > > > in parallel. When the "groupBy" operator sends the messages to this
> > > > repartition topic, it will set in the record metadata the extracted
> > > > timestamp from the payload, and hence for the downstream aggregation
> > > > operator to read from this repartition topic, it is OK to always use
> > > > the ExtractRecordMetadataTimestamp
> > > > to extract that timestamp and use the extracted value to determine
> > which
> > > > window this record should fall into.
> > > >
> > > > More details can be found in this JIRA:
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > >
> > > >
> > > > So the record timestamp used during aggregation should be the same as
> > the
> > > > one in the payload, if you do observe that is not the case, this is
> > > > unexpected. In that case could you share your complete code snippet,
> > > > especially how input stream "in" is defined, and your config
> properties
> > > > defined for us to investigate?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhvalnov@gmail.com>
> > > > wrote:
> > > >
> > > > > Good morning,
> > > > >
> > > > > we have simple use-case where we want to count number of events by
> > each
> > > > > hour grouped by some fields from event itself.
> > > > >
> > > > > Our event timestamp is embedded into messages itself (json) and we
> > > using
> > > > > trivial custom timestamp extractor (which called and works as
> > > expected).
> > > > >
> > > > > What we facing is that there is always timestamp used that coming
> > > > > from ExtractRecordMetadataTimestamp when determining matching
> windows
> > > for
> > > > > event, inside KStreamWindowAggregate.process() and never value
> from
> > > our
> > > > > json timestamp extractor.
> > > > >
> > > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > > timestamp in a message hour ago from now for instance. Topology
> > always
> > > > > calculating matching hour bucket (window) using record timestamp,
> not
> > > > > payload.
> > > > >
> > > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> > settings
> > > > or
> > > > > other tricks to accommodate our use-case?
> > > > >
> > > > > For reference our setup: brokers, kafka-stream and kafka-clients
> all
> > of
> > > > > v1.0.0
> > > > > And here is code:
> > > > >
> > > > > KTable<Windowed<Tuple>, Long> summaries = in
> > > > >    .groupBy((key, value) -> ......)
> > > > >    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > > >    .count();
> > > > >
> > > > > Thank you.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message