kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Schlansker <sschlans...@opentable.com>
Subject Re: Finding StreamsMetadata with value-dependent partitioning
Date Fri, 02 Jun 2017 22:53:08 GMT
> 
> On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matthias@confluent.io> wrote:
> 
> Thanks. That helps to understand the use case better.
> 
> Rephrase to make sure I understood it correctly:
> 
> 1) you are providing a custom partitioner to Streams that is base on one
> field in your value (that's fine with regard to fault-tolerance :))
> 2) you want to use interactive queries to query the store
> 3) because of your custom partitioning schema, you need to manually
> figure out the right application instance that hosts a key
> 4) thus, you use a GlobalKTable to maintain the information from K to D
> and thus to the partition ie, streams instance that hosts K
> 
> If this is correct, than you cannot use the "by key" metadata interface.
> It's designed to find the streams instance base in the key only -- but
> your partitioner is based on the value. Internally, we call
> 
>> final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);
> 
> Note, that `value==null` -- at this point, we don't have any value
> available and can't provide it to the partitioner.
> 
> Thus, your approach to get all metadata is the only way you can go.

Thanks for confirming this.  The code is a little ugly but I've done worse :)

> 
> 
> Very interesting (and quite special) use case. :)
> 
> 
> -Matthias
> 
> On 6/2/17 2:32 PM, Steven Schlansker wrote:
>> 
>>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matthias@confluent.io> wrote:
>>> 
>>> I am not sure if I understand the use case correctly. Could you give
>>> some more context?
>> 
>> Happily, thanks for thinking about this!
>> 
>>> 
>>>> backing store whose partitioning is value dependent
>>> 
>>> In infer that you are using a custom store and not default RocksDB? If
>>> yes, what do you use? What does "value dependent" mean in this context?
>> 
>> We're currently using the base in memory store.  We tried to use RocksDB
>> but the tuning to get it running appropriately in a Linux container without
>> tripping the cgroups OOM killer is nontrivial.
>> 
>> 
>>> Right now, I am wondering, why you not just set a new key to get your
>>> data grouped by the field you are interesting in? Also, if you don't
>>> partitioned your data by key, you might break your streams application
>>> with regard to fault-tolerance -- or does your custom store not rely on
>>> changelog backup for fault-tolerance?
>>> 
>> 
>> That's an interesting point about making transformed key.  But I don't think
>> it simplifies my problem too much.  Essentially, I have a list of messages
>> that should get delivered to destinations.  Each message has a primary key K
>> and a destination D.
>> 
>> We partition over D so that all messages to the same destination are handled by
>> the same worker, to preserve ordering and implement local rate limits etc.
>> 
>> I want to preserve the illusion to the client that they can look up a key with
>> only K.  So, as an intermediate step, we use the GlobalKTable to look up D.  Once
>> we have K,D we can then compute the partition and execute a lookup.
>> 
>> Transforming the key to be a composite K,D isn't helpful because the end user still
>> only knows K -- D's relevance is an implementation detail I wish to hide -- so you
still
>> need some sort of secondary lookup.
>> 
>> We do use the changelog backup for fault tolerance -- how would having the partition
>> based on the value break this?  Is the changelog implicitly partitioned by a partitioner
>> other than the one we give to the topology?
>> 
>> Hopefully that explains my situation a bit more?  Thanks!
>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> 
>>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>>>> I have a KTable and backing store whose partitioning is value dependent.
>>>> I want certain groups of messages to be ordered and that grouping is determined
>>>> by one field (D) of the (possibly large) value.
>>>> 
>>>> When I lookup by only K, obviously you don't know the partition it should
be on.
>>>> So I will build a GlobalKTable of K -> D.  This gives me enough information
>>>> to determine the partition.
>>>> 
>>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
>>>> It allows you to either get all metadata, or by key -- but if you lookup
by key
>>>> it just substitutes a null value (causing a downstream NPE)
>>>> 
>>>> I can iterate over all metadata and compute the mapping of K -> K,D ->
P
>>>> and then iterate over all metadata looking for P.  It's not difficult but
ends
>>>> up being a bit of somewhat ugly code that feels like I shouldn't have to
write it.
>>>> 
>>>> Am I missing something here?  Is there a better way that I've missed?  Thanks!
>>>> 
>>> 
>> 
> 


Mime
View raw message