kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Schlansker <sschlans...@opentable.com>
Subject Re: Suppressing intermediate topics feeding (Global)KTable
Date Fri, 02 Jun 2017 21:37:01 GMT

> On Jun 2, 2017, at 2:21 PM, Matthias J. Sax <matthias@confluent.io> wrote:
> 
> Hi,
> 
> If you want to populate a GlobalKTable you can only do this by reading a
> topic. So the short answer for you head line is: no, you can suppress
> the intermediate topic.

Bummer!  Maybe this is an opt-in optimization to consider later.

> 
> However, I am wondering what the purpose of you secondary index is, and
> why you are using a GlobalKTable for it. Maybe you can elaborate a
> little bit?

Elaborated on this a bit in the other thread, I was trying to keep separate
problems separate, but maybe I just made things more confusing!

tl;dr is that the user requests values knowing K, but there is actually a
"hidden composite key" D that is relevant to the partitioning strategy.

The GlobalKTable allows you to look up K -> D, and then find the right local KTable K,D
-> V

> 
> I am also wondering about this code snippet:
> 
>>> builder.stream(mainTopic)
>>>       .mapValues(...)
>>>       .to(secondaryIndex1)
> 
> Should it not be .map() that transforms (k,v) ->
> (v.getSecondaryKey1(),k) ? Just for my understanding what you are doing.
> 

In this case, the "externally visible" K needs additional information about
the destination D so that it can be partitioned correctly.  So the code looks
like:

        // TODO: sucks that this materializes an intermediate topic
        msgStream
                .mapValues(v -> v == null ? null : v.getResolvedDestination().toString())
                .to(Serdes.String(), Serdes.String(), DEST_INDEX);

        builder.globalTable(Serdes.String(), Serdes.String(), DEST_INDEX, DEST_INDEX);

> 
> -Matthias
> 
> 
> On 6/2/17 12:28 PM, Steven Schlansker wrote:
>> Hi everyone, another question for the list :)
>> 
>> I'm creating a cluster of KTable (and GlobalKTable) based off the same
>> input stream K,V.
>> 
>> It has a number of secondary indices (think like a RDBMS)
>> K1 -> K
>> K2 -> K
>> etc
>> 
>> These are all based off of trivial mappings from my main stream that also
>> feeds the K -> V StateStore.  Think one liners like v -> v.getSecondaryKey1()
>> 
>> Currently, for each one it seems that I have to do
>> 
>> builder.stream(mainTopic)
>>       .mapValues(...)
>>       .to(secondaryIndex1)
>> 
>> builder.globalTable(secondaryIndex1, secondaryIndexStore1);
>> 
>> Unfortunately the intermediate "secondaryIndex1" topic is relatively
>> low value.  In a case where my state stores are lost, I already have to
>> read through the mainTopic to recover the main state store.  While it's doing
>> that, I'd much rather it rebuild the GlobalKTable instance from that data
>> directly.  Then I could skip having this index in Kafka at all, it's entirely
>> redundant.  The data is already loaded and deserialized for the benefit of
>> another Processor.
>> 
>> Any thoughts?  Happy Friday,
>> Steven
>> 
> 


Mime
View raw message