kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Verify time semantics through topology
Date Fri, 05 May 2017 05:16:21 GMT
Hi,

I am not sure if I understand correctly:  If you use default
TimestampExtractor, the whole pipeline will be event-time based.

However, as you want to compute the AVG, I would recommend a different
pattern anyway:

FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable

In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
records (ie, a custom data data for value) and in mapValue() you compute
<k,avg>.

Hope this helps.

-Matthias

On 5/4/17 7:36 PM, Garrett Barton wrote:
> I think I have an understanding of how Kafka Streams is handling time
> behind the scenes and would like someone to verify it for me.  The actual
> reason is I am running into behavior where I only can join two streams for
> a little, then it stops working.
> 
> Assuming a topology like this:
> 
> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> countKStream.
> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> sumKStream.
> 
> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> reduce() = avgKTable
> 
> Given that FEED is populated into kafka with the event time for the
> timestamp (and just to make sure I have a TimeExtractor extracting the time
> again), I believe time processing happens like this (ET = Event Time, PT =
> Process Time):
> 
> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> countKStream.
> ET -> ET -> ET -> PT
> 
> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> sumKStream.
> ET -> ET -> ET -> PT
> 
> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> reduce() = avgKTable
> PT -> PT -> PT
> 
> Thus my join has really attempted to join records based on kafka's
> processing time from the previous aggregations and not by event time like I
> want.  When the streams start things seem to work well, avg topic/stores
> populate.  After a few minutes count gets way ahead of sum and then avg
> completely stops populating anything.  My hunch is that the processing time
> gets outside that 1 minute join window and it no longer joins, increasing
> the until to any number (tried 1 year) has no effect either.
> 
> Is this the correct way to calculate an average over a 1 minute event time
> window with say a 14 day lag time (to load old data)?
> 
> Thank you all!
> 


Mime
View raw message