kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Garrett Barton <garrett.bar...@gmail.com>
Subject Re: Verify time semantics through topology
Date Fri, 05 May 2017 15:09:42 GMT
That does actually, I never thought about a custom value object to hold the
Count/Sum variables. Thank you!

For the time semantics here is where I got hung up, copied from kafka
streams documentation:

Finally, whenever a Kafka Streams application writes records to Kafka, then
it will also assign timestamps to these new records. The way the timestamps
are assigned depends on the context:

   - When new output records are generated via processing some input
   record, for example, context.forward() triggered in the process()
   function call, output record timestamps are inherited from input record
   timestamps directly.
      - *Given I set things to Event Time, this would output Event Time
      correct?*
      - When new output records are generated via periodic functions such
   as punctuate(), the output record timestamp is defined as the current
   internal time (obtained through context.timestamp()) of the stream task.
      - *This is where I am confused, what operations count as a
      punctuate()? Just the low level api? And are these thus Process time?*
      - For aggregations, the timestamp of a resulting aggregate update
   record will be that of the latest arrived input record that triggered the
   update.
      - *This sounds like last used Event Time, correct?*


On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Hi,
>
> I am not sure if I understand correctly:  If you use default
> TimestampExtractor, the whole pipeline will be event-time based.
>
> However, as you want to compute the AVG, I would recommend a different
> pattern anyway:
>
> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable
>
> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
> records (ie, a custom data data for value) and in mapValue() you compute
> <k,avg>.
>
> Hope this helps.
>
> -Matthias
>
> On 5/4/17 7:36 PM, Garrett Barton wrote:
> > I think I have an understanding of how Kafka Streams is handling time
> > behind the scenes and would like someone to verify it for me.  The actual
> > reason is I am running into behavior where I only can join two streams
> for
> > a little, then it stops working.
> >
> > Assuming a topology like this:
> >
> > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> > countKStream.
> > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> > sumKStream.
> >
> > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> > reduce() = avgKTable
> >
> > Given that FEED is populated into kafka with the event time for the
> > timestamp (and just to make sure I have a TimeExtractor extracting the
> time
> > again), I believe time processing happens like this (ET = Event Time, PT
> =
> > Process Time):
> >
> > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> > countKStream.
> > ET -> ET -> ET -> PT
> >
> > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> > sumKStream.
> > ET -> ET -> ET -> PT
> >
> > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> > reduce() = avgKTable
> > PT -> PT -> PT
> >
> > Thus my join has really attempted to join records based on kafka's
> > processing time from the previous aggregations and not by event time
> like I
> > want.  When the streams start things seem to work well, avg topic/stores
> > populate.  After a few minutes count gets way ahead of sum and then avg
> > completely stops populating anything.  My hunch is that the processing
> time
> > gets outside that 1 minute join window and it no longer joins, increasing
> > the until to any number (tried 1 year) has no effect either.
> >
> > Is this the correct way to calculate an average over a 1 minute event
> time
> > window with say a 14 day lag time (to load old data)?
> >
> > Thank you all!
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message