kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Verify time semantics through topology
Date Sat, 06 May 2017 21:49:08 GMT
About the join:

Joins work perfectly fine if you apply them to "plain records" you read
from a topic. When joining records, the records timestamp is used to
compute the join result.

The "problem" in your case is that you apply the join to a windowed
aggregation result. And thus, there is no "record timestamp" and Streams
falls back to "stream time" to assign a timestamp to the window result
record that can be used for the join.

Glad it works with my suggested solution. :)

-Matthias

On 5/5/17 6:41 PM, Garrett Barton wrote:
> Matthias,
>  That does make a lot of sense, so Streams never will create time its
> always using a byproduct of a record time passed into it.  Thus in theory
> unless I force a change somewhere in a flow, the flow will stay as I start
> it.
> 
> The confusing part is around joins, since 'stream time' is kinda loosely
> derived from where kafka streams thinks it is globally from consuming the
> upstream topic, and this is where the timing can get out of sync.  And it
> did break my original flow after a few minutes every single time.  That
> part kind of makes me think that in a join the window and until likely
> should be the same value, given that the streams could be off quite a bit.
> But that is another topic.
> 
>  I redid my stream as you suggested and it worked wonderfully, shrunk the
> flows considerably, and I can finally calculate averages consistently
> longer than a few minutes. Thank you!
> 
> On Fri, May 5, 2017 at 1:06 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> That part of time tracking is a little tricky.
>>
>> Streams internally maintains "stream time" -- this model the progress of
>> your application over all input partitions and topics, and is based on
>> the timestamps return by the timestamp extractor. Thus, if timestamp
>> extractor returns even time, "stream time" will we event-time based,
>> too. (Streams, never calls System.currentTimeMillis() so assign
>> timestamps.)
>>
>> This internally tracked "stream time" is used in punctuate() (yes, low
>> level API only) and for window operations to define the output record's
>> timestamp. As "stream time" depends on record processing order, it might
>> vary a little bit (the computation of it itself is deterministic, but it
>> depends what records get fetched from the brokers, and the fetching step
>> is not deterministic, making "global" processing order
>> non-deterministic, too -- what is a general Kafka property: order is
>> only guaranteed within a single partitions, but not across partitions).
>> This little varying in "stream time" computation might break you join
>> step in your original code... You would need to base the join on
>> window-start time and not on event-time to get it right (and thus, you
>> would not even need a windowed join). But the join is to "clumsy" anyway.
>>
>> Does this answer all your questions?
>>
>> (We don't document those details on purpose, because it's an internal
>> design and we want the flexibility to change this if required -- thus,
>> you should also not rely on "stream time" advance assumptions in your
>> code.)
>>
>>
>> -Matthias
>>
>>
>> On 5/5/17 8:09 AM, Garrett Barton wrote:
>>> That does actually, I never thought about a custom value object to hold
>> the
>>> Count/Sum variables. Thank you!
>>>
>>> For the time semantics here is where I got hung up, copied from kafka
>>> streams documentation:
>>>
>>> Finally, whenever a Kafka Streams application writes records to Kafka,
>> then
>>> it will also assign timestamps to these new records. The way the
>> timestamps
>>> are assigned depends on the context:
>>>
>>>    - When new output records are generated via processing some input
>>>    record, for example, context.forward() triggered in the process()
>>>    function call, output record timestamps are inherited from input
>> record
>>>    timestamps directly.
>>>       - *Given I set things to Event Time, this would output Event Time
>>>       correct?*
>>>       - When new output records are generated via periodic functions such
>>>    as punctuate(), the output record timestamp is defined as the current
>>>    internal time (obtained through context.timestamp()) of the stream
>> task.
>>>       - *This is where I am confused, what operations count as a
>>>       punctuate()? Just the low level api? And are these thus Process
>> time?*
>>>       - For aggregations, the timestamp of a resulting aggregate update
>>>    record will be that of the latest arrived input record that triggered
>> the
>>>    update.
>>>       - *This sounds like last used Event Time, correct?*
>>>
>>>
>>> On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am not sure if I understand correctly:  If you use default
>>>> TimestampExtractor, the whole pipeline will be event-time based.
>>>>
>>>> However, as you want to compute the AVG, I would recommend a different
>>>> pattern anyway:
>>>>
>>>> FEED -> groupByKey() -> window() -> aggregate() -> mapValues()
=
>> avgKTable
>>>>
>>>> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
>>>> records (ie, a custom data data for value) and in mapValue() you compute
>>>> <k,avg>.
>>>>
>>>> Hope this helps.
>>>>
>>>> -Matthias
>>>>
>>>> On 5/4/17 7:36 PM, Garrett Barton wrote:
>>>>> I think I have an understanding of how Kafka Streams is handling time
>>>>> behind the scenes and would like someone to verify it for me.  The
>> actual
>>>>> reason is I am running into behavior where I only can join two streams
>>>> for
>>>>> a little, then it stops working.
>>>>>
>>>>> Assuming a topology like this:
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream()
=
>>>>> countKStream.
>>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream()
=
>>>>> sumKStream.
>>>>>
>>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month)
->
>>>>> reduce() = avgKTable
>>>>>
>>>>> Given that FEED is populated into kafka with the event time for the
>>>>> timestamp (and just to make sure I have a TimeExtractor extracting the
>>>> time
>>>>> again), I believe time processing happens like this (ET = Event Time,
>> PT
>>>> =
>>>>> Process Time):
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream()
=
>>>>> countKStream.
>>>>> ET -> ET -> ET -> PT
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream()
=
>>>>> sumKStream.
>>>>> ET -> ET -> ET -> PT
>>>>>
>>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month)
->
>>>>> reduce() = avgKTable
>>>>> PT -> PT -> PT
>>>>>
>>>>> Thus my join has really attempted to join records based on kafka's
>>>>> processing time from the previous aggregations and not by event time
>>>> like I
>>>>> want.  When the streams start things seem to work well, avg
>> topic/stores
>>>>> populate.  After a few minutes count gets way ahead of sum and then avg
>>>>> completely stops populating anything.  My hunch is that the processing
>>>> time
>>>>> gets outside that 1 minute join window and it no longer joins,
>> increasing
>>>>> the until to any number (tried 1 year) has no effect either.
>>>>>
>>>>> Is this the correct way to calculate an average over a 1 minute event
>>>> time
>>>>> window with say a 14 day lag time (to load old data)?
>>>>>
>>>>> Thank you all!
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Mime
View raw message