kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: How to chain increasing window operations one after another
Date Mon, 08 May 2017 08:43:45 GMT
Apologies,

In the code snippet of course only oneMinuteWindowed KTable will have a 
Windowed key (KTable<Windowed<Key>, Value>), all others would be just 
KTable<Tuple2<Key, Long>, Value>.

MichaƂ

On 07/05/17 16:09, Michal Borowiecki wrote:
>
> Hi Garrett,
>
> I've encountered a similar challenge in a project I'm working on (it's 
> still work in progress, so please take my suggestions with a grain of 
> salt).
>
> Yes, I believe KTable.groupBy lets you accomplish what you are aiming 
> for with something like the following (same snippet attached as txt file):
>
>
> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    // 
> where Key and Value stand for your actual key and value types
>
>     .groupByKey()
>
>     .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
>
>         //where your adder can be as simple as (val, agg) -> agg + val
>
>         //for primitive types or as complex as you need
>
>
> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed = 
> oneMinuteWindowed    // Tuple2 for this example as defined by 
> javaslang library
>
>     .groupBy( (windowedKey, value) -> new KeyValue<>(new 
> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5 
> *1000*60*5), value)
>
>         // the above rounds time down to a timestamp divisible by 5 
> minutes
>
>     .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>
>         // where your subtractor can be as simple as (val, agg) -> agg 
> - valfor primitive types or as complex as you need,
>
>         // just make sure you get the order right (lesson hard learnt 
> ;) ), subtraction is not commutative!
>
>
> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =

> fiveMinuteWindowed
>
>     .groupBy( (keyPair, value) -> new KeyValue<>(new 
> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>
>         // the above rounds time down to a timestamp divisible by 15 
> minutes
>
>     .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>
>
> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed = 
> fifteeenMinuteWindowed
>
>     .groupBy( (keyPair, value) -> new KeyValue<>(new 
> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>
>         // the above rounds time down to a timestamp divisible by 5 
> minutes
>
>     .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>
>
> So, step by step:
>
>   * You use a windowed aggregation only once, from there on you use
>     the KTable abstraction only (which doesn't have windowed
>     aggregations).
>   * In each subsequent groupBy you map the key to a pair of
>     (your-real-key, timestamp) where the timestamp is rounded down
>     with the precision of the size of the new window.
>   * reduce() on a KGroupedTable takes an adder and a subtractor and it
>     will correctly update the new aggregate by first subtracting the
>     previous value of the upstream record before adding the new value
>     (this way, just as you said, the downstream is aware of the
>     statefulness of the upstream and correctly treats each record as
>     an update)
>   * If you want to reduce message volume further, you can break these
>     into separate KafkaStreams instances and configure downstream ones
>     with a higher commit.interval.ms (unfortunately you can't have
>     different values of this setting in different places of the same
>     topology I'm afraid)
>   * TODO: Look into retention policies, I haven't investigated that in
>     any detail.
>
> I haven't tested this exact code, so please excuse any typos.
>
> Also, if someone with more experience could chip in and check if I'm 
> not talking nonsense here, or if there's an easier way to this, that 
> would be great.
>
>
> I don't know if the alternative approach is possible, where you 
> convert each resulting KTable back into a stream and just do a 
> windowed aggregation somehow. That would feel more natural, but I 
> haven't figured out how to correctly window over a changelog in the 
> KStream abstraction, feels impossible in the high-level DSL.
>
> Hope that helps,
> Michal
>
> On 02/05/17 18:03, Garrett Barton wrote:
>> Lets say I want to sum values over increasing window sizes of 1,5,15,60
>> minutes.  Right now I have them running in parallel, meaning if I am
>> producing 1k/sec records I am consuming 4k/sec to feed each calculation.
>> In reality I am calculating far more than sum, and in this pattern I'm
>> looking at something like (producing rate)*(calculations)*(windows) for a
>> consumption rate.
>>
>>   So I had the idea, could I feed the 1 minute window into the 5 minute, and
>> 5 into 15, and 15 into 60.  Theoretically I would consume a fraction of the
>> records, not have to scale as huge and be back to something like (producing
>> rate)*(calculations)+(updates).
>>
>>    Thinking this is an awesome idea I went to try and implement it and got
>> twisted around.  These are windowed grouping operations that produce
>> KTables, which means instead of a raw stream I have an update stream.  To
>> me this implies that downstream must be aware of this and consume stateful
>> information, knowing that each record is an update and not an in addition
>> to.  Does the high level api handle that construct and let me do that?  For
>> a simple sum it would have to hold each of the latest values for say the 5
>> 1 minute sum's in a given window, to perform the 5 minute sum.  Reading the
>> docs which are awesome, I cannot determine if the KTable.groupby() would
>> work over a window, and would reduce or aggregate thus do what I need?
>>
>> Any ideas?
>>
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Mime
View raw message