kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Issues with KTable to KTable leftJoin
Date Fri, 01 Feb 2019 22:29:20 GMT
> So, per my understanding maybe if I materialize
>> the first join and then use join it with the other topic, this should
>> change?

Yes. For this case, we don't need to recompute the old value, because
the downstream operator can lookup the old join result in it's store.

Note that the `ValueJoiner#apply()` method is called twice, however
both, old and new value are sent in a single message to the downstream
operator though (thus, the order does not really matter). Furthermore,
to compute the old join result, we first need to recompute the old join
result, base on the old value before we can update the input KTable
store with the new value.

About modifying input values: yes, they are expected to not be modified
(Java does unfortunately not have a mechanism to enforce this.) If you
write your code differently, than it might work correctly if there is
only one join (but it's a coincidence and there is no actual guarantee
provided) -- it still considered a bug in the ValueJoiner UDF even if
you got lucky and it works for this case.

If you think the JavaDocs are not explicit enough, please open a PR and
propose an change. We happily improve them!


On 2/1/19 12:24 PM, Murilo Tavares wrote:
> Hi Matthias
> Thank you for your help. So, per my understanding maybe if I materialize
> the first join and then use join it with the other topic, this should
> change?
> What I don't understand is why sending the old value AFTER the new value?
> That still looks wrong to me. Specially because in a leftJoin we can't know
> if this is an old or new value like we do in aggregations (where we have an
> adder and a subtractor).
> Anyway, I managed to overcome this with the help of this SO question:
> https://stackoverflow.com/questions/51565727/kafka-stream-chained-leftjoin-processing-previous-old-message-again-after-the,
> where there's a comment that says: " When joining, I need to initialize and
> return a new object rather than assign value to the old object".
> In my scenario, when I receive an update on the right table, I am UPDATING
> the value from the left table, and returning it. Then the joinner would be
> called again with the old value from the right table, and the left table's
> value would be updated with the old, wrong value.
> This can be fixed by cloning the value that will be modified, so that the
> second call will not incorrectly modify it.
> For reference, the KafkaStreams code that calls my joiner is this:
>             newValue = joiner.apply(change.newValue, value2);
>             *if* (sendOldValues) {
>                 oldValue = joiner.apply(change.oldValue, value2);
>             }
>             context().forward(key, *new* Change<>(newValue, oldValue));
> https://github.com/axbaretto/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java#L94
> In my situation, value2 is modified inside "joiner" and returned (thus
> assigned to newValue). Then on the second call when we change value2 again,
> with the oldValue, we are also changing the newValue.
> Although cloning value2 fixes my problem, I still think it's an issue.
> Mainly because this behaviour doesn't affect single (non-cascading) left
> joins, which sounds like an inconsistency to me. And also because the order
> of the operations seems wrong to me. Calling a joiner with the old value
> AFTER the new value, does not allow this function to know what's the last
> known value for that message.
> Still, even if you are not convinced this is an issue, I believe at least
> the "leftJoin"'s Javadoc should mention the side effects of modifying and
> returning the same instance received as argument.
> Thanks
> Murilo
> On Fri, 1 Feb 2019 at 14:38, Matthias J. Sax <matthias@confluent.io> wrote:
>> Sounds like expected behavior to me.
>> Note, that by default, the result KTable for a KTable-KTable join is not
>> materialized. To be able to compute the correct result for this case, we
>> need to send the old and new join result downstream to allow the
>> downstream join to compute the correct result. It's storage/computation
>> trade-off.
>> Does this answer your question?
>> -Matthias
>> On 1/29/19 1:51 PM, Murilo Tavares wrote:
>>> Adding a bit more to this, it looks like an issue with cascading
>>> leftJoins...
>>> I can see that internally, the ValueJoiner is being called by a
>>> KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues
>> =
>>> true.
>>> Not quite sure, but it looks to me that when we create "tmp
>>> = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A),
>> and
>>> sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
>>> setting sendOldValues on B, or something like that...
>>> Is there any known issues on cascading leftJoins?
>>> Thanks
>>> Murilo
>>> On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <murilofla@gmail.com>
>> wrote:
>>>> Hi
>>>> I am trying to understand why a KTable to KTable left join is being
>> called
>>>> twice when I receive a message on the right table.
>>>> Here is my Topology:
>>>> Serde<Author> authorSerde = ...
>>>> Serde<Set<Book>> bSetSerde = ...
>>>> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
>>>> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
>>>> Consumed.with(Serdes.String(), authorSerde));
>>>> KTable<String, Set<Book>> booksByAuthorTable =
>>>> builder.table(BOOKS_BY_AUTHOR,
>>>> Consumed.with(Serdes.String(), bSetSerde));
>>>> KTable<String, Set<AutorPublisherAssociation>> apTable =
>>>> Consumed.with(Serdes.String(), apSetSerde));
>>>> KTable<String, Author> enrichedAuthorTable =
>> authorTable.leftJoin(apTable,
>>>> (a,apSet) -> {
>>>> if (apSet == null) {
>>>> a.setPublishers(new HashSet<>());
>>>> } else {
>>>> a.setPublishers(apSet.stream().map(ap ->
>>>> ap.getPublisher()).collect(Collectors.toSet()));
>>>> }
>>>> return a;
>>>> }).leftJoin(booksByAuthorTable, (a, b)-> {
>>>> a.setBooks(b);
>>>> return a;
>>>> });
>>>> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
>>>> Produced.with(Serdes.String(),authorSerde));
>>>> Note I have 3 topics, all of them keyed by Author:
>>>> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
>>>> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
>>>> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set
>> of
>>>> AutorPublisherAssociation (this is a Pojo that links one author to one
>>>> publisher);
>>>> Also note that the IF is intended to avoid NPEs, and also to deal with
>>>> tombstones, where if I want to delete the list of publishers associated
>> to
>>>> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would
>> override
>>>> the list of Publishers on the Author.
>>>> In my simple testcase, I am not sending any updates, just one message on
>>>> each topic, on this order: author, booksByAuthor, publisherByAuthor.
>>>> When author arrives, both ValueJoiners are called with author message
>> and
>>>> null for the right table.
>>>> When a set of books arrive, both joins will be called ONCE, the first
>>>> joiner receives an author and null, the second joiner receives an author
>>>> and the set of books.
>>>> The problem comes next:
>>>> When the set of  AutorPublisherAssociation arrives, the first
>> ValueJoiner
>>>> is called TWICE, one with author and apSet, and the second time it's
>> called
>>>> with author and null.
>>>> I don't understand why in this scenario the ValueJoiner is called twice,
>>>> with a null instead of the message at last, overriding the correct
>> value.
>>>> Thanks
>>>> Murilo

View raw message