kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Shouldn't the initializer of a stream aggregate accept the key?
Date Fri, 05 May 2017 04:54:57 GMT
Yes. Key can be of any type and we cannot enforce immutable types at API
level, and thus, it could get modified as a "side effect".

The problem is, that if the key would be modified, it would corrupt data
partitioning and thus would lead to wrong result.

It's not possible to modify the key via return -- the returned value
will only be the initial value for the aggregation.


On 5/4/17 6:00 PM, João Peixoto wrote:
> So the reasoning is that the key may be a mutable object which in term
> could potentially cause disaster?
> Just clarifying because I think the initializer should only return a value
> (as it does right now).
> On Thu, May 4, 2017 at 3:02 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
>> Currently, you don't get any hold on the key, because the key must be
>> protected from modification.
>> Thus, the initial value must be the same for all keys atm.
>> We are aware that this limits some use cases and there is already a
>> proposal to add keys to some API.
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner
>> It might make sense to include the `Initializer` interface into KIP-149,
>> too.
>> Atm, you would need to write some custom code to tackle this use case.
>> -Matthias
>> On 5/3/17 10:38 PM, João Peixoto wrote:
>>> Looking at the aggregate documentation
>>> <http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating>
>> one
>>> of the required items is an "initializer", no arguments and returns a
>> value.
>>> Shouldn't this initializer follow a similar approach of Java's
>>> computIfAbsent
>>> <
>> https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function-
>>> and
>>> pass the key being initialized to said "initializer"?
>>> KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
>>>     (aggKey) -> 0L, // Difference here
>>>     (aggKey, newValue, aggValue) -> ...
>>> The documentation says "When a *record key* is received for the first
>> time,
>>> the initializer is called (and called before the adder).", so there may
>> be
>>> use cases where the new value may be influenced by the key being
>>> initialized.

View raw message