kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Shouldn't the initializer of a stream aggregate accept the key?
Date Thu, 04 May 2017 05:38:23 GMT
Looking at the aggregate documentation
<http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating> one
of the required items is an "initializer", no arguments and returns a value.

Shouldn't this initializer follow a similar approach of Java's
computIfAbsent
<https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function->
and
pass the key being initialized to said "initializer"?

KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    (aggKey) -> 0L, // Difference here
    (aggKey, newValue, aggValue) -> ...

The documentation says "When a *record key* is received for the first time,
the initializer is called (and called before the adder).", so there may be
use cases where the new value may be influenced by the key being
initialized.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message