kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Garrett Barton <garrett.bar...@gmail.com>
Subject Re: Verify time semantics through topology
Date Sat, 06 May 2017 01:41:55 GMT
Matthias,
 That does make a lot of sense, so Streams never will create time its
always using a byproduct of a record time passed into it.  Thus in theory
unless I force a change somewhere in a flow, the flow will stay as I start
it.

The confusing part is around joins, since 'stream time' is kinda loosely
derived from where kafka streams thinks it is globally from consuming the
upstream topic, and this is where the timing can get out of sync.  And it
did break my original flow after a few minutes every single time.  That
part kind of makes me think that in a join the window and until likely
should be the same value, given that the streams could be off quite a bit.
But that is another topic.

 I redid my stream as you suggested and it worked wonderfully, shrunk the
flows considerably, and I can finally calculate averages consistently
longer than a few minutes. Thank you!

On Fri, May 5, 2017 at 1:06 PM, Matthias J. Sax <matthias@confluent.io>
wrote:

> That part of time tracking is a little tricky.
>
> Streams internally maintains "stream time" -- this model the progress of
> your application over all input partitions and topics, and is based on
> the timestamps return by the timestamp extractor. Thus, if timestamp
> extractor returns even time, "stream time" will we event-time based,
> too. (Streams, never calls System.currentTimeMillis() so assign
> timestamps.)
>
> This internally tracked "stream time" is used in punctuate() (yes, low
> level API only) and for window operations to define the output record's
> timestamp. As "stream time" depends on record processing order, it might
> vary a little bit (the computation of it itself is deterministic, but it
> depends what records get fetched from the brokers, and the fetching step
> is not deterministic, making "global" processing order
> non-deterministic, too -- what is a general Kafka property: order is
> only guaranteed within a single partitions, but not across partitions).
> This little varying in "stream time" computation might break you join
> step in your original code... You would need to base the join on
> window-start time and not on event-time to get it right (and thus, you
> would not even need a windowed join). But the join is to "clumsy" anyway.
>
> Does this answer all your questions?
>
> (We don't document those details on purpose, because it's an internal
> design and we want the flexibility to change this if required -- thus,
> you should also not rely on "stream time" advance assumptions in your
> code.)
>
>
> -Matthias
>
>
> On 5/5/17 8:09 AM, Garrett Barton wrote:
> > That does actually, I never thought about a custom value object to hold
> the
> > Count/Sum variables. Thank you!
> >
> > For the time semantics here is where I got hung up, copied from kafka
> > streams documentation:
> >
> > Finally, whenever a Kafka Streams application writes records to Kafka,
> then
> > it will also assign timestamps to these new records. The way the
> timestamps
> > are assigned depends on the context:
> >
> >    - When new output records are generated via processing some input
> >    record, for example, context.forward() triggered in the process()
> >    function call, output record timestamps are inherited from input
> record
> >    timestamps directly.
> >       - *Given I set things to Event Time, this would output Event Time
> >       correct?*
> >       - When new output records are generated via periodic functions such
> >    as punctuate(), the output record timestamp is defined as the current
> >    internal time (obtained through context.timestamp()) of the stream
> task.
> >       - *This is where I am confused, what operations count as a
> >       punctuate()? Just the low level api? And are these thus Process
> time?*
> >       - For aggregations, the timestamp of a resulting aggregate update
> >    record will be that of the latest arrived input record that triggered
> the
> >    update.
> >       - *This sounds like last used Event Time, correct?*
> >
> >
> > On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> Hi,
> >>
> >> I am not sure if I understand correctly:  If you use default
> >> TimestampExtractor, the whole pipeline will be event-time based.
> >>
> >> However, as you want to compute the AVG, I would recommend a different
> >> pattern anyway:
> >>
> >> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() =
> avgKTable
> >>
> >> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
> >> records (ie, a custom data data for value) and in mapValue() you compute
> >> <k,avg>.
> >>
> >> Hope this helps.
> >>
> >> -Matthias
> >>
> >> On 5/4/17 7:36 PM, Garrett Barton wrote:
> >>> I think I have an understanding of how Kafka Streams is handling time
> >>> behind the scenes and would like someone to verify it for me.  The
> actual
> >>> reason is I am running into behavior where I only can join two streams
> >> for
> >>> a little, then it stops working.
> >>>
> >>> Assuming a topology like this:
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream()
=
> >>> countKStream.
> >>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream()
=
> >>> sumKStream.
> >>>
> >>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> >>> reduce() = avgKTable
> >>>
> >>> Given that FEED is populated into kafka with the event time for the
> >>> timestamp (and just to make sure I have a TimeExtractor extracting the
> >> time
> >>> again), I believe time processing happens like this (ET = Event Time,
> PT
> >> =
> >>> Process Time):
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream()
=
> >>> countKStream.
> >>> ET -> ET -> ET -> PT
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream()
=
> >>> sumKStream.
> >>> ET -> ET -> ET -> PT
> >>>
> >>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> >>> reduce() = avgKTable
> >>> PT -> PT -> PT
> >>>
> >>> Thus my join has really attempted to join records based on kafka's
> >>> processing time from the previous aggregations and not by event time
> >> like I
> >>> want.  When the streams start things seem to work well, avg
> topic/stores
> >>> populate.  After a few minutes count gets way ahead of sum and then avg
> >>> completely stops populating anything.  My hunch is that the processing
> >> time
> >>> gets outside that 1 minute join window and it no longer joins,
> increasing
> >>> the until to any number (tried 1 year) has no effect either.
> >>>
> >>> Is this the correct way to calculate an average over a 1 minute event
> >> time
> >>> window with say a 14 day lag time (to load old data)?
> >>>
> >>> Thank you all!
> >>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message