kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Verify time semantics through topology
Date Fri, 05 May 2017 17:06:03 GMT
That part of time tracking is a little tricky.

Streams internally maintains "stream time" -- this model the progress of
your application over all input partitions and topics, and is based on
the timestamps return by the timestamp extractor. Thus, if timestamp
extractor returns even time, "stream time" will we event-time based,
too. (Streams, never calls System.currentTimeMillis() so assign timestamps.)

This internally tracked "stream time" is used in punctuate() (yes, low
level API only) and for window operations to define the output record's
timestamp. As "stream time" depends on record processing order, it might
vary a little bit (the computation of it itself is deterministic, but it
depends what records get fetched from the brokers, and the fetching step
is not deterministic, making "global" processing order
non-deterministic, too -- what is a general Kafka property: order is
only guaranteed within a single partitions, but not across partitions).
This little varying in "stream time" computation might break you join
step in your original code... You would need to base the join on
window-start time and not on event-time to get it right (and thus, you
would not even need a windowed join). But the join is to "clumsy" anyway.

Does this answer all your questions?

(We don't document those details on purpose, because it's an internal
design and we want the flexibility to change this if required -- thus,
you should also not rely on "stream time" advance assumptions in your code.)


-Matthias


On 5/5/17 8:09 AM, Garrett Barton wrote:
> That does actually, I never thought about a custom value object to hold the
> Count/Sum variables. Thank you!
> 
> For the time semantics here is where I got hung up, copied from kafka
> streams documentation:
> 
> Finally, whenever a Kafka Streams application writes records to Kafka, then
> it will also assign timestamps to these new records. The way the timestamps
> are assigned depends on the context:
> 
>    - When new output records are generated via processing some input
>    record, for example, context.forward() triggered in the process()
>    function call, output record timestamps are inherited from input record
>    timestamps directly.
>       - *Given I set things to Event Time, this would output Event Time
>       correct?*
>       - When new output records are generated via periodic functions such
>    as punctuate(), the output record timestamp is defined as the current
>    internal time (obtained through context.timestamp()) of the stream task.
>       - *This is where I am confused, what operations count as a
>       punctuate()? Just the low level api? And are these thus Process time?*
>       - For aggregations, the timestamp of a resulting aggregate update
>    record will be that of the latest arrived input record that triggered the
>    update.
>       - *This sounds like last used Event Time, correct?*
> 
> 
> On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> Hi,
>>
>> I am not sure if I understand correctly:  If you use default
>> TimestampExtractor, the whole pipeline will be event-time based.
>>
>> However, as you want to compute the AVG, I would recommend a different
>> pattern anyway:
>>
>> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable
>>
>> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
>> records (ie, a custom data data for value) and in mapValue() you compute
>> <k,avg>.
>>
>> Hope this helps.
>>
>> -Matthias
>>
>> On 5/4/17 7:36 PM, Garrett Barton wrote:
>>> I think I have an understanding of how Kafka Streams is handling time
>>> behind the scenes and would like someone to verify it for me.  The actual
>>> reason is I am running into behavior where I only can join two streams
>> for
>>> a little, then it stops working.
>>>
>>> Assuming a topology like this:
>>>
>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>> countKStream.
>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>> sumKStream.
>>>
>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>> reduce() = avgKTable
>>>
>>> Given that FEED is populated into kafka with the event time for the
>>> timestamp (and just to make sure I have a TimeExtractor extracting the
>> time
>>> again), I believe time processing happens like this (ET = Event Time, PT
>> =
>>> Process Time):
>>>
>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>> countKStream.
>>> ET -> ET -> ET -> PT
>>>
>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>> sumKStream.
>>> ET -> ET -> ET -> PT
>>>
>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>> reduce() = avgKTable
>>> PT -> PT -> PT
>>>
>>> Thus my join has really attempted to join records based on kafka's
>>> processing time from the previous aggregations and not by event time
>> like I
>>> want.  When the streams start things seem to work well, avg topic/stores
>>> populate.  After a few minutes count gets way ahead of sum and then avg
>>> completely stops populating anything.  My hunch is that the processing
>> time
>>> gets outside that 1 minute join window and it no longer joins, increasing
>>> the until to any number (tried 1 year) has no effect either.
>>>
>>> Is this the correct way to calculate an average over a 1 minute event
>> time
>>> window with say a 14 day lag time (to load old data)?
>>>
>>> Thank you all!
>>>
>>
>>
> 


Mime
View raw message