kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Issues with KTable to KTable leftJoin
Date Fri, 01 Feb 2019 19:38:04 GMT
Sounds like expected behavior to me.

Note, that by default, the result KTable for a KTable-KTable join is not
materialized. To be able to compute the correct result for this case, we
need to send the old and new join result downstream to allow the
downstream join to compute the correct result. It's storage/computation
trade-off.

Does this answer your question?


-Matthias

On 1/29/19 1:51 PM, Murilo Tavares wrote:
> Adding a bit more to this, it looks like an issue with cascading
> leftJoins...
> I can see that internally, the ValueJoiner is being called by a
> KTableKTableRightJoin$KTableKTableRightJoinProcessor, with sendOldValues =
> true.
> Not quite sure, but it looks to me that when we create "tmp
> = A.leftJoin(B)", internally KafkaStreams also creates B.rightJoin(A), and
> sets sendOldValues on A. Later, by calling tmp.leftJoin(C), it could be
> setting sendOldValues on B, or something like that...
> Is there any known issues on cascading leftJoins?
> Thanks
> Murilo
> 
> 
> 
> On Tue, 29 Jan 2019 at 14:10, Murilo Tavares <murilofla@gmail.com> wrote:
> 
>> Hi
>> I am trying to understand why a KTable to KTable left join is being called
>> twice when I receive a message on the right table.
>> Here is my Topology:
>>
>> Serde<Author> authorSerde = ...
>> Serde<Set<Book>> bSetSerde = ...
>> Serde<Set<AutorPublisherAssociation>> apSetSerde = ...
>> KTable<String, Author> authorTable = builder.table(AUTHOR_TOPIC,
>> Consumed.with(Serdes.String(), authorSerde));
>> KTable<String, Set<Book>> booksByAuthorTable =
>> builder.table(BOOKS_BY_AUTHOR,
>> Consumed.with(Serdes.String(), bSetSerde));
>> KTable<String, Set<AutorPublisherAssociation>> apTable =
>> builder.table(PUBLISHER_ASSOCIATIONS_BY_AUTHOR,
>> Consumed.with(Serdes.String(), apSetSerde));
>> KTable<String, Author> enrichedAuthorTable = authorTable.leftJoin(apTable,
>> (a,apSet) -> {
>> if (apSet == null) {
>> a.setPublishers(new HashSet<>());
>> } else {
>> a.setPublishers(apSet.stream().map(ap ->
>> ap.getPublisher()).collect(Collectors.toSet()));
>> }
>> return a;
>> }).leftJoin(booksByAuthorTable, (a, b)-> {
>> a.setBooks(b);
>> return a;
>> });
>> enrichedAuthorTable.toStream().to(ENRICHED_AUTHORS,
>> Produced.with(Serdes.String(),authorSerde));
>>
>> Note I have 3 topics, all of them keyed by Author:
>> - AUTHOR_TOPIC is keyed by the authorKey and has the Author message;
>> - BOOKS_BY_AUTHOR is keyed by authorKey and has a Set of Books;
>> - PUBLISHER_ASSOCIATIONS_BY_AUTHOR is keyed by authorKey and has a set of
>> AutorPublisherAssociation (this is a Pojo that links one author to one
>> publisher);
>>
>> Also note that the IF is intended to avoid NPEs, and also to deal with
>> tombstones, where if I want to delete the list of publishers associated to
>> an author, an tombstone to the PUBLISHER_ASSOCIATIONS topics would override
>> the list of Publishers on the Author.
>>
>> In my simple testcase, I am not sending any updates, just one message on
>> each topic, on this order: author, booksByAuthor, publisherByAuthor.
>> When author arrives, both ValueJoiners are called with author message and
>> null for the right table.
>> When a set of books arrive, both joins will be called ONCE, the first
>> joiner receives an author and null, the second joiner receives an author
>> and the set of books.
>> The problem comes next:
>> When the set of  AutorPublisherAssociation arrives, the first ValueJoiner
>> is called TWICE, one with author and apSet, and the second time it's called
>> with author and null.
>>
>> I don't understand why in this scenario the ValueJoiner is called twice,
>> with a null instead of the message at last, overriding the correct value.
>>
>> Thanks
>> Murilo
>>
>>
>>
> 


Mime
View raw message