kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: How to chain increasing window operations one after another
Date Tue, 09 May 2017 07:14:51 GMT
> This seems to be a question that might affect many users, and it might
> we worth to document it somewhere as a recommended pattern.
I was thinking the same thing :)

How about a page on the wiki listing useful patterns with subpages for 
each patten in detail? (like for KIPs)

Thanks,

Michał


On 08/05/17 22:26, Matthias J. Sax wrote:
> Michal,
>
> that's an interesting idea. In an ideal world, Kafka Streams should have
> an optimizer that is able to to this automatically under the hood. Too
> bad we are not there yet.
>
> @Garret: did you try this out?
>
> This seems to be a question that might affect many users, and it might
> we worth to document it somewhere as a recommended pattern.
>
>
> -Matthias
>
>
> On 5/8/17 1:43 AM, Michal Borowiecki wrote:
>> Apologies,
>>
>> In the code snippet of course only oneMinuteWindowed KTable will have a
>> Windowed key (KTable<Windowed<Key>, Value>), all others would be just
>> KTable<Tuple2<Key, Long>, Value>.
>>
>> Michał
>>
>> On 07/05/17 16:09, Michal Borowiecki wrote:
>>> Hi Garrett,
>>>
>>> I've encountered a similar challenge in a project I'm working on (it's
>>> still work in progress, so please take my suggestions with a grain of
>>> salt).
>>>
>>> Yes, I believe KTable.groupBy lets you accomplish what you are aiming
>>> for with something like the following (same snippet attached as txt file):
>>>
>>>
>>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    //
>>> where Key and Value stand for your actual key and value types
>>>
>>>      .groupByKey()
>>>
>>>      .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
>>>
>>>          //where your adder can be as simple as (val, agg) -> agg + val
>>>
>>>          //for primitive types or as complex as you need
>>>
>>>
>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed
=
>>> oneMinuteWindowed    // Tuple2 for this example as defined by
>>> javaslang library
>>>
>>>      .groupBy( (windowedKey, value) -> new KeyValue<>(new
>>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>>> *1000*60*5), value)
>>>
>>>          // the above rounds time down to a timestamp divisible by 5
>>> minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>
>>>          // where your subtractor can be as simple as (val, agg) -> agg
>>> - valfor primitive types or as complex as you need,
>>>
>>>          // just make sure you get the order right (lesson hard learnt
>>> ;) ), subtraction is not commutative!
>>>
>>>
>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed
=
>>> fiveMinuteWindowed
>>>
>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>>>
>>>          // the above rounds time down to a timestamp divisible by 15
>>> minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>
>>>
>>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed
=
>>> fifteeenMinuteWindowed
>>>
>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>>>
>>>          // the above rounds time down to a timestamp divisible by 5
>>> minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>
>>>
>>> So, step by step:
>>>
>>>    * You use a windowed aggregation only once, from there on you use
>>>      the KTable abstraction only (which doesn't have windowed
>>>      aggregations).
>>>    * In each subsequent groupBy you map the key to a pair of
>>>      (your-real-key, timestamp) where the timestamp is rounded down
>>>      with the precision of the size of the new window.
>>>    * reduce() on a KGroupedTable takes an adder and a subtractor and it
>>>      will correctly update the new aggregate by first subtracting the
>>>      previous value of the upstream record before adding the new value
>>>      (this way, just as you said, the downstream is aware of the
>>>      statefulness of the upstream and correctly treats each record as
>>>      an update)
>>>    * If you want to reduce message volume further, you can break these
>>>      into separate KafkaStreams instances and configure downstream ones
>>>      with a higher commit.interval.ms (unfortunately you can't have
>>>      different values of this setting in different places of the same
>>>      topology I'm afraid)
>>>    * TODO: Look into retention policies, I haven't investigated that in
>>>      any detail.
>>>
>>> I haven't tested this exact code, so please excuse any typos.
>>>
>>> Also, if someone with more experience could chip in and check if I'm
>>> not talking nonsense here, or if there's an easier way to this, that
>>> would be great.
>>>
>>>
>>> I don't know if the alternative approach is possible, where you
>>> convert each resulting KTable back into a stream and just do a
>>> windowed aggregation somehow. That would feel more natural, but I
>>> haven't figured out how to correctly window over a changelog in the
>>> KStream abstraction, feels impossible in the high-level DSL.
>>>
>>> Hope that helps,
>>> Michal
>>>
>>> On 02/05/17 18:03, Garrett Barton wrote:
>>>> Lets say I want to sum values over increasing window sizes of 1,5,15,60
>>>> minutes.  Right now I have them running in parallel, meaning if I am
>>>> producing 1k/sec records I am consuming 4k/sec to feed each calculation.
>>>> In reality I am calculating far more than sum, and in this pattern I'm
>>>> looking at something like (producing rate)*(calculations)*(windows) for a
>>>> consumption rate.
>>>>
>>>>   So I had the idea, could I feed the 1 minute window into the 5 minute,
and
>>>> 5 into 15, and 15 into 60.  Theoretically I would consume a fraction of the
>>>> records, not have to scale as huge and be back to something like (producing
>>>> rate)*(calculations)+(updates).
>>>>
>>>>    Thinking this is an awesome idea I went to try and implement it and got
>>>> twisted around.  These are windowed grouping operations that produce
>>>> KTables, which means instead of a raw stream I have an update stream.  To
>>>> me this implies that downstream must be aware of this and consume stateful
>>>> information, knowing that each record is an update and not an in addition
>>>> to.  Does the high level api handle that construct and let me do that?  For
>>>> a simple sum it would have to hold each of the latest values for say the
5
>>>> 1 minute sum's in a given window, to perform the 5 minute sum.  Reading the
>>>> docs which are awesome, I cannot determine if the KTable.groupby() would
>>>> work over a window, and would reduce or aggregate thus do what I need?
>>>>
>>>> Any ideas?
>>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/> 	Michal Borowiecki
>>> Senior Software Engineer L4
>>> 	T: 	+44 208 742 1600
>>>
>>> 	
>>> 	+44 203 249 8448
>>>
>>> 	
>>> 	
>>> 	E: 	michal.borowiecki@openbet.com
>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>
>>> 	
>>> 	OpenBet Ltd
>>>
>>> 	Chiswick Park Building 9
>>>
>>> 	566 Chiswick High Rd
>>>
>>> 	London
>>>
>>> 	W4 5XT
>>>
>>> 	UK
>>>
>>> 	
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If
>>> you have received this message in error, please immediately notify the
>>> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it
>>> from your system as well as any copies. The content of e-mails as well
>>> as traffic data may be monitored by OpenBet for employment and
>>> security purposes. To protect the environment please do not print this
>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>>> company registered in England and Wales. Registered no. 3134634. VAT
>>> no. GB927523612
>>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If you
>> have received this message in error, please immediately notify the
>> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it
>> from your system as well as any copies. The content of e-mails as well
>> as traffic data may be monitored by OpenBet for employment and security
>> purposes. To protect the environment please do not print this e-mail
>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>> registered in England and Wales. Registered no. 3134634. VAT no.
>> GB927523612
>>


Mime
View raw message