kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: How to chain increasing window operations one after another
Date Tue, 09 May 2017 13:21:24 GMT
Just had a thought:

If you implement the Windowed/Tuple serde to store the timestamp(s) 
before the actual record key then you can simply periodically do a 
ranged query on each of the state stores to find and delete all data 
older than ... (using punctuate() inside a Processor).

Any downsides to that?

Cheers,

Michał


On 09/05/17 09:17, Michal Borowiecki wrote:
> Hi Matthias,
>
> Yes, the ever growing stores were my concern too. That was the 
> intention behind my TODO note in the first reply just didn't want to 
> touch on this until I've dug deeper into it.
>
> I understand compaction+retention policy on the backing changelog 
> topics takes care of cleaning up on the broker-side but Rocks dbs will 
> grow indefinitely, right? (until re-balanced?)
>
>
> Punctuation was the first idea that came to my mind too when 
> originally faced this problem on my project. However, as you said it's 
> only on KStream and aggregations on KStream actually discard 
> tombstones and don't forward them on to the KTable:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L798-L799

>
>
>      * Aggregate the values of records in this stream by the grouped key.
>      * Records with {@code null} key or value are ignored.
>
> I haven't come up with a satisfactory solution yet, but it's still on 
> my mind.
>
>
> TTLs on stores could potentially solve this issue and just today they 
> were asked about on SO: 
> http://stackoverflow.com/questions/43860114/kafka-streams-low-level-processor-api-rocksdb-timetolivettl/43862922#43862922
>
> Garrett, was that you? :-)
>
>
> Thanks,
>
> Michał
>
>
> On 08/05/17 23:29, Matthias J. Sax wrote:
>> Thinking about this once more (and also having a fresh memory of another
>> thread about KTables), I am wondering if this approach needs some extra
>> tuning:
>>
>> As the result of the first window aggregation produces an output stream
>> with unbounded key space, the following (non-windowed) KTables would
>> grow indefinitely, if I don't miss anything.
>>
>> Thus, it might be required to put a transform() that only forwards all
>> data 1-to-1, but additionally registers a punctuation schedule. When
>> punctuation is called, it would be required to send tombstone messages
>> downstream (or a simliar) that deletes windows that are older than the
>> retention time. Sound tricky to implement though... `transform()` would
>> need to keep track of used keys to send appropriate tombstones in an
>> custom state. Also. `transform` is only available for KStream and
>> transforming (windowed) KTable into KStream into KTable while preserving
>> the required semantics seems not to be straight forwards.
>>
>> Any thoughts about this potential issue?
>>
>>
>> -Matthias
>>
>>
>> On 5/8/17 3:05 PM, Garrett Barton wrote:
>>> Michael,
>>>    This is slick!  I am still writing unit tests to verify it.  My code
>>> looks something like:
>>>
>>> KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed
=
>>> srcStream    // my val object isnt really called that, just wanted 
>>> to show
>>> a sample set of calculations the value can do!
>>>      .groupByKey(Serdes.String(), Serdes.Double())
>>>      .aggregate(/*initializer */, /* aggregator */, 
>>> TimeWindows.of(60*1000,
>>> 60*1000), "store1m");
>>>
>>>     // i used an aggregate here so I could have a non-primitive 
>>> value object
>>> that does the calculations on each aggregator, pojo has an 
>>> .add(Double) in
>>> it.
>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed
=
>>> oneMinuteWindowed    // I made my own Tuple2, will move window calc 
>>> into it
>>>      .groupBy( (windowedKey, value) -> new KeyValue<>(new 
>>> Tuple2<String,
>>> Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5 
>>> *1000*60*5),
>>> value, keySerde, valSerde)
>>>
>>>          // the above rounds time down to a timestamp divisible by 5 
>>> minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>
>>>          // where your subtractor can be as simple as (val, agg) -> 
>>> agg - val
>>> for primitive types or as complex as you need,
>>>
>>>          // just make sure you get the order right (lesson hard 
>>> learnt ;) ),
>>> subtraction is not commutative!
>>>
>>>          // again my val object has an .add(Obj) and a .sub() to handle
>>> this, so nice!
>>>
>>>
>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> 
>>> fifteenMinuteWindowed =
>>> fiveMinuteWindowed
>>>
>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new 
>>> Tuple2(keyPair._1,
>>> keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)
>>>
>>>          // the above rounds time down to a timestamp divisible by 
>>> 15 minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>
>>>
>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> 
>>> sixtyMinuteWindowed =
>>> fifteeenMinuteWindowed
>>>
>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new 
>>> Tuple2(keyPairair._1,
>>> pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)
>>>
>>>          // the above rounds time down to a timestamp divisible by 
>>> 60 minutes
>>>
>>>      .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>
>>>
>>> Notes thus far:
>>>    Doesn't look like I need to start the 5min with a windowed KTable 
>>> return
>>> object, it starts with the regular KTable<Tuple2<String,Long>> in

>>> this case.
>>>    I thinking about using windowedKey.window().start() instead of 
>>> end() as I
>>> believe that is more consistent with what the windows themselves put 
>>> out.
>>> They go into the stores bound by their start time I believe.
>>>    Serdes gets nuts as well as the Generic typing on some of these 
>>> classes
>>> (yea you KeyValueMapper), makes for long code!  I had to specify them
>>> everywhere since the key/val's changed.
>>>
>>>
>>> I didn't get enough time to mess with it today, I will wrap up the unit
>>> tests and run it to see how it performs against my real data as well
>>> tomorrow.  I expect a huge reduction in resources (both streams and 
>>> kafka
>>> storage) by moving to this.
>>> Thank you!
>>>
>>>
>>>
>>> On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>
>>>> Michal,
>>>>
>>>> that's an interesting idea. In an ideal world, Kafka Streams should 
>>>> have
>>>> an optimizer that is able to to this automatically under the hood. Too
>>>> bad we are not there yet.
>>>>
>>>> @Garret: did you try this out?
>>>>
>>>> This seems to be a question that might affect many users, and it might
>>>> we worth to document it somewhere as a recommended pattern.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 5/8/17 1:43 AM, Michal Borowiecki wrote:
>>>>> Apologies,
>>>>>
>>>>> In the code snippet of course only oneMinuteWindowed KTable will 
>>>>> have a
>>>>> Windowed key (KTable<Windowed<Key>, Value>), all others would
be just
>>>>> KTable<Tuple2<Key, Long>, Value>.
>>>>>
>>>>> Michał
>>>>>
>>>>> On 07/05/17 16:09, Michal Borowiecki wrote:
>>>>>> Hi Garrett,
>>>>>>
>>>>>> I've encountered a similar challenge in a project I'm working on

>>>>>> (it's
>>>>>> still work in progress, so please take my suggestions with a 
>>>>>> grain of
>>>>>> salt).
>>>>>>
>>>>>> Yes, I believe KTable.groupBy lets you accomplish what you are 
>>>>>> aiming
>>>>>> for with something like the following (same snippet attached as txt
>>>> file):
>>>>>>
>>>>>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream
   //
>>>>>> where Key and Value stand for your actual key and value types
>>>>>>
>>>>>>      .groupByKey()
>>>>>>
>>>>>>      .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),
>>>> "store1m");
>>>>>>          //where your adder can be as simple as (val, agg) ->
agg 
>>>>>> + val
>>>>>>
>>>>>>          //for primitive types or as complex as you need
>>>>>>
>>>>>>
>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed
=
>>>>>> oneMinuteWindowed    // Tuple2 for this example as defined by
>>>>>> javaslang library
>>>>>>
>>>>>>      .groupBy( (windowedKey, value) -> new KeyValue<>(new
>>>>>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>>>>>> *1000*60*5), value)
>>>>>>
>>>>>>          // the above rounds time down to a timestamp divisible by
5
>>>>>> minutes
>>>>>>
>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>>>>
>>>>>>          // where your subtractor can be as simple as (val, agg)

>>>>>> -> agg
>>>>>> - valfor primitive types or as complex as you need,
>>>>>>
>>>>>>          // just make sure you get the order right (lesson hard 
>>>>>> learnt
>>>>>> ;) ), subtraction is not commutative!
>>>>>>
>>>>>>
>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed
=
>>>>>> fiveMinuteWindowed
>>>>>>
>>>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>>>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>>>>>>
>>>>>>          // the above rounds time down to a timestamp divisible 
>>>>>> by 15
>>>>>> minutes
>>>>>>
>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>>>>
>>>>>>
>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed
=
>>>>>> fifteeenMinuteWindowed
>>>>>>
>>>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>>>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>>>>>>
>>>>>>          // the above rounds time down to a timestamp divisible by
5
>>>>>> minutes
>>>>>>
>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>>>>
>>>>>>
>>>>>> So, step by step:
>>>>>>
>>>>>>    * You use a windowed aggregation only once, from there on you
use
>>>>>>      the KTable abstraction only (which doesn't have windowed
>>>>>>      aggregations).
>>>>>>    * In each subsequent groupBy you map the key to a pair of
>>>>>>      (your-real-key, timestamp) where the timestamp is rounded down
>>>>>>      with the precision of the size of the new window.
>>>>>>    * reduce() on a KGroupedTable takes an adder and a subtractor

>>>>>> and it
>>>>>>      will correctly update the new aggregate by first subtracting

>>>>>> the
>>>>>>      previous value of the upstream record before adding the new

>>>>>> value
>>>>>>      (this way, just as you said, the downstream is aware of the
>>>>>>      statefulness of the upstream and correctly treats each 
>>>>>> record as
>>>>>>      an update)
>>>>>>    * If you want to reduce message volume further, you can break

>>>>>> these
>>>>>>      into separate KafkaStreams instances and configure 
>>>>>> downstream ones
>>>>>>      with a higher commit.interval.ms (unfortunately you can't have
>>>>>>      different values of this setting in different places of the

>>>>>> same
>>>>>>      topology I'm afraid)
>>>>>>    * TODO: Look into retention policies, I haven't investigated 
>>>>>> that in
>>>>>>      any detail.
>>>>>>
>>>>>> I haven't tested this exact code, so please excuse any typos.
>>>>>>
>>>>>> Also, if someone with more experience could chip in and check if
I'm
>>>>>> not talking nonsense here, or if there's an easier way to this, that
>>>>>> would be great.
>>>>>>
>>>>>>
>>>>>> I don't know if the alternative approach is possible, where you
>>>>>> convert each resulting KTable back into a stream and just do a
>>>>>> windowed aggregation somehow. That would feel more natural, but I
>>>>>> haven't figured out how to correctly window over a changelog in the
>>>>>> KStream abstraction, feels impossible in the high-level DSL.
>>>>>>
>>>>>> Hope that helps,
>>>>>> Michal
>>>>>>
>>>>>> On 02/05/17 18:03, Garrett Barton wrote:
>>>>>>> Lets say I want to sum values over increasing window sizes of

>>>>>>> 1,5,15,60
>>>>>>> minutes.  Right now I have them running in parallel, meaning
if 
>>>>>>> I am
>>>>>>> producing 1k/sec records I am consuming 4k/sec to feed each
>>>> calculation.
>>>>>>> In reality I am calculating far more than sum, and in this 
>>>>>>> pattern I'm
>>>>>>> looking at something like (producing rate)*(calculations)*(windows)
>>>> for a
>>>>>>> consumption rate.
>>>>>>>
>>>>>>>   So I had the idea, could I feed the 1 minute window into the
5
>>>> minute, and
>>>>>>> 5 into 15, and 15 into 60. Theoretically I would consume a fraction
>>>> of the
>>>>>>> records, not have to scale as huge and be back to something like
>>>> (producing
>>>>>>> rate)*(calculations)+(updates).
>>>>>>>
>>>>>>>    Thinking this is an awesome idea I went to try and implement

>>>>>>> it and
>>>> got
>>>>>>> twisted around.  These are windowed grouping operations that

>>>>>>> produce
>>>>>>> KTables, which means instead of a raw stream I have an update

>>>>>>> stream.
>>>> To
>>>>>>> me this implies that downstream must be aware of this and consume
>>>> stateful
>>>>>>> information, knowing that each record is an update and not an
in
>>>> addition
>>>>>>> to.  Does the high level api handle that construct and let me
do
>>>> that?  For
>>>>>>> a simple sum it would have to hold each of the latest values
for 
>>>>>>> say
>>>> the 5
>>>>>>> 1 minute sum's in a given window, to perform the 5 minute sum.
>>>> Reading the
>>>>>>> docs which are awesome, I cannot determine if the KTable.groupby()
>>>> would
>>>>>>> work over a window, and would reduce or aggregate thus do what
I 
>>>>>>> need?
>>>>>>>
>>>>>>> Any ideas?
>>>>>>>
>>>>>> -- 
>>>>>> Signature
>>>>>> <http://www.openbet.com/>    Michal Borowiecki
>>>>>> Senior Software Engineer L4
>>>>>>       T:      +44 208 742 1600
>>>>>>
>>>>>>
>>>>>>       +44 203 249 8448
>>>>>>
>>>>>>
>>>>>>
>>>>>>       E:      michal.borowiecki@openbet.com
>>>>>>       W:      www.openbet.com <http://www.openbet.com/>
>>>>>>
>>>>>>
>>>>>>       OpenBet Ltd
>>>>>>
>>>>>>       Chiswick Park Building 9
>>>>>>
>>>>>>       566 Chiswick High Rd
>>>>>>
>>>>>>       London
>>>>>>
>>>>>>       W4 5XT
>>>>>>
>>>>>>       UK
>>>>>>
>>>>>>
>>>>>> <https://www.openbet.com/email_promo>
>>>>>>
>>>>>> This message is confidential and intended only for the addressee.
If
>>>>>> you have received this message in error, please immediately 
>>>>>> notify the
>>>>>> postmaster@openbet.com <mailto:postmaster@openbet.com> and
delete it
>>>>>> from your system as well as any copies. The content of e-mails as

>>>>>> well
>>>>>> as traffic data may be monitored by OpenBet for employment and
>>>>>> security purposes. To protect the environment please do not print

>>>>>> this
>>>>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick

>>>>>> Park
>>>>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United 
>>>>>> Kingdom. A
>>>>>> company registered in England and Wales. Registered no. 3134634.
VAT
>>>>>> no. GB927523612
>>>>>>
>>>>> -- 
>>>>> Signature
>>>>> <http://www.openbet.com/>     Michal Borowiecki
>>>>> Senior Software Engineer L4
>>>>>        T:      +44 208 742 1600
>>>>>
>>>>>
>>>>>        +44 203 249 8448
>>>>>
>>>>>
>>>>>
>>>>>        E:      michal.borowiecki@openbet.com
>>>>>        W:      www.openbet.com <http://www.openbet.com/>
>>>>>
>>>>>
>>>>>        OpenBet Ltd
>>>>>
>>>>>        Chiswick Park Building 9
>>>>>
>>>>>        566 Chiswick High Rd
>>>>>
>>>>>        London
>>>>>
>>>>>        W4 5XT
>>>>>
>>>>>        UK
>>>>>
>>>>>
>>>>> <https://www.openbet.com/email_promo>
>>>>>
>>>>> This message is confidential and intended only for the addressee. 
>>>>> If you
>>>>> have received this message in error, please immediately notify the
>>>>> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete
it
>>>>> from your system as well as any copies. The content of e-mails as 
>>>>> well
>>>>> as traffic data may be monitored by OpenBet for employment and 
>>>>> security
>>>>> purposes. To protect the environment please do not print this e-mail
>>>>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
>>>>> Building
>>>>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>>>>> registered in England and Wales. Registered no. 3134634. VAT no.
>>>>> GB927523612
>>>>>
>>>>
>


Mime
View raw message