kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
Subject Re: kafka-streams, late data, tumbling windows aggregations and event time
Date Tue, 06 Mar 2018 14:39:02 GMT
Guozhang,

here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614

i'd also like to continue discussion little bit further about timestamps.
Was trying to test with broker configured "CreateTime" and got question
about sink topic timestamps, back to example:

KTable<Windowed<Tuple>, Long> summaries = in
   .groupBy((key, value) -> ......)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

summaries.toStream().to(sink);

Each record written to sink will get timestamp assigned to grouping window
start time, which quite often will be in the past.

What the logic behind that? Honestly was expected sink messages to get
"now" timestamp.


On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Sounds great! :)
>
> On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> dvsekhvalnov@gmail.com
> > wrote:
>
> > Thanks, that's an option, i'll take a look at configuration.
> >
> > But yeah, i was thinking same, if streams relies on the fact that
> internal
> > topics should use 'CreateTime' configuration, then it is streams library
> > responsibility to configure it.
> >
> > I can open a Jira ticket :)
> >
> > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > In your case, you can override this config to CreateTime only for the
> > > internal topics created by Streams, this is documented in
> > >
> > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > streams/StreamsConfig.html#TOPIC_PREFIX
> > >
> > >
> > > We are also discussing to always override the
> log.message.timestamp.type
> > > config for internal topics to CreateTime, I vaguely remember there is a
> > > JIRA open for it in case you are interested in contributing to Streams
> > > library.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > dvsekhvalnov@gmail.com
> > > > wrote:
> > >
> > > > Which effectively means given scenario is not working with
> > LogAppendTime,
> > > > correct? Because all internal re-partition topics will always contain
> > > "now"
> > > > instead of real timestamp from original payload message?
> > > >
> > > > Is kafka-streams designed to work with LogAppendTime at all? It
> seems a
> > > lot
> > > > of stuff will NOT work correctly using
> > > > built-in ExtractRecordMetadataTimestamp ?
> > > >
> > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > universally,
> > > > > it will ignore whatever timestamp set in the message metadata and
> > > > override
> > > > > it with the append time. So when the messages are fetched by
> > downstream
> > > > > processors which always use the metadata timestamp extractor, it
> will
> > > get
> > > > > the append timestamp set by brokers.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > dvsekhvalnov@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > interesting, will same logic applies (internal topic rewrite)
for
> > > > brokers
> > > > > > configured with:
> > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > >
> > > > > > ?
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> wangguoz@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Dmitriy,
> > > > > > >
> > > > > > > What you have observed is by design, and it maybe a bit
> confusing
> > > at
> > > > > > first
> > > > > > > place. Let me explain:
> > > > > > >
> > > > > > > When you do a group-by aggregation like the above case,
during
> > the
> > > > > > > "groupBy((key,
> > > > > > > value) -> ......)" stage Streams library will do a
> > re-partitioning
> > > by
> > > > > > > sending the original data stream into an internal repartition
> > topic
> > > > > based
> > > > > > > on the aggregation key defined in the "groupBy" function
and
> > fetch
> > > > from
> > > > > > > that topic again. This is similar to a shuffle phase in
> > distributed
> > > > > > > computing frameworks to make sure the down stream aggregations
> > can
> > > be
> > > > > > done
> > > > > > > in parallel. When the "groupBy" operator sends the messages
to
> > this
> > > > > > > repartition topic, it will set in the record metadata the
> > extracted
> > > > > > > timestamp from the payload, and hence for the downstream
> > > aggregation
> > > > > > > operator to read from this repartition topic, it is OK
to
> always
> > > use
> > > > > > > the ExtractRecordMetadataTimestamp
> > > > > > > to extract that timestamp and use the extracted value to
> > determine
> > > > > which
> > > > > > > window this record should fall into.
> > > > > > >
> > > > > > > More details can be found in this JIRA:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > > >
> > > > > > >
> > > > > > > So the record timestamp used during aggregation should
be the
> > same
> > > as
> > > > > the
> > > > > > > one in the payload, if you do observe that is not the case,
> this
> > is
> > > > > > > unexpected. In that case could you share your complete
code
> > > snippet,
> > > > > > > especially how input stream "in" is defined, and your config
> > > > properties
> > > > > > > defined for us to investigate?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > > > dvsekhvalnov@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Good morning,
> > > > > > > >
> > > > > > > > we have simple use-case where we want to count number
of
> events
> > > by
> > > > > each
> > > > > > > > hour grouped by some fields from event itself.
> > > > > > > >
> > > > > > > > Our event timestamp is embedded into messages itself
(json)
> and
> > > we
> > > > > > using
> > > > > > > > trivial custom timestamp extractor (which called and
works as
> > > > > > expected).
> > > > > > > >
> > > > > > > > What we facing is that there is always timestamp used
that
> > coming
> > > > > > > > from ExtractRecordMetadataTimestamp when determining
matching
> > > > windows
> > > > > > for
> > > > > > > > event, inside KStreamWindowAggregate.process() and
never
> value
> > > > from
> > > > > > our
> > > > > > > > json timestamp extractor.
> > > > > > > >
> > > > > > > > Effectively it doesn't work correctly if we test on
late
> data,
> > > e.g.
> > > > > > > > timestamp in a message hour ago from now for instance.
> Topology
> > > > > always
> > > > > > > > calculating matching hour bucket (window) using record
> > timestamp,
> > > > not
> > > > > > > > payload.
> > > > > > > >
> > > > > > > > Is it expected behaviour ? Are we getting windowing
wrong?
> Any
> > > > > settings
> > > > > > > or
> > > > > > > > other tricks to accommodate our use-case?
> > > > > > > >
> > > > > > > > For reference our setup: brokers, kafka-stream and
> > kafka-clients
> > > > all
> > > > > of
> > > > > > > > v1.0.0
> > > > > > > > And here is code:
> > > > > > > >
> > > > > > > > KTable<Windowed<Tuple>, Long> summaries
= in
> > > > > > > >    .groupBy((key, value) -> ......)
> > > > > > > >    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > > > > > >    .count();
> > > > > > > >
> > > > > > > > Thank you.
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message