kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fq Public <fq.publ...@gmail.com>
Subject Clarify “the order of execution for the subtractor and adder is not defined”
Date Fri, 29 Jan 2021 00:50:46 GMT
The Streams DSL documentation
<https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
includes
a caveat about using the aggregate method to transform a KGroupedTable →
KTable, as follows (emphasis mine):

When subsequent non-null values are received for a key (e.g., UPDATE), then
(1) the subtractor is called with the old value as stored in the table and
(2) the adder is called with the new value of the input record that was
just received. *The order of execution for the subtractor and adder is not
defined.*

My interpretation of that last line implies that one of three things can
happen:

   1. subtractor can be called before adder
   2. adder can be called before subtractor
   3. adder and subtractor could be called at the same time

Here is the question I'm looking to get answered:
*Are all 3 scenarios above actually possible when using the aggregate
method on a KGroupedTable?*
Or am I misinterpreting the documentation? For my use-case (detailed
below), it would be ideal if the subtractor was always called before the
adder.
------------------------------

*Why is this question important?*

If the adder and subtractor are non-commutative operations and the order in
which they are executed can vary, you can end up with different results
depending on the order of execution of adder and subtractor. An example of
a useful non-commutative operation would be something like if we’re
aggregating records into a Set:

.aggregate[Set[Animal]](Set.empty)(
  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)

In this example, for duplicated events, if the adder is called before the
subtractor you would end up removing the value entirely from the set (which
would be problematic for most use-cases I imagine).
------------------------------

*Why am I doubting the documentation (assuming my interpretation of it is
correct)?*

   1. Seems like an unusual design choice
   2. When I've run unit tests (using TopologyTestDriver and
   EmbeddedKafka), I always see the subtractor is called before the adder.
   Unfortunately, if there is some kind of race condition involved, it's
   entirely possible that I would never hit the other scenarios.
   3. I did try looking into the kafka-streams codebase as well. The
   KTableProcessorSupplier that calls the user-supplied adder/subtracter
   functions appears to be this one:
   https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
and
   on line 92, you can even see a comment saying "first try to remove the old
   value". Unfortunately, during my own testing, I saw was that the
process function
   itself is called twice; first with a Change<V> value that includes only
   the old value and then the process function is called again with a Change<V>
   value that includes only the new value. I haven't been able to dig deep
   enough to find the internal code that is generating the old value record
   and the new value record (upon receiving an update) to determine if it
   actually produces those records in that order.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message