kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Finding StreamsMetadata with value-dependent partitioning
Date Fri, 02 Jun 2017 22:32:35 GMT
Thanks. That helps to understand the use case better.

Rephrase to make sure I understood it correctly:

1) you are providing a custom partitioner to Streams that is base on one
field in your value (that's fine with regard to fault-tolerance :))
2) you want to use interactive queries to query the store
3) because of your custom partitioning schema, you need to manually
figure out the right application instance that hosts a key
4) thus, you use a GlobalKTable to maintain the information from K to D
and thus to the partition ie, streams instance that hosts K

If this is correct, than you cannot use the "by key" metadata interface.
It's designed to find the streams instance base in the key only -- but
your partitioner is based on the value. Internally, we call

> final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions);

Note, that `value==null` -- at this point, we don't have any value
available and can't provide it to the partitioner.

Thus, your approach to get all metadata is the only way you can go.


Very interesting (and quite special) use case. :)


-Matthias

On 6/2/17 2:32 PM, Steven Schlansker wrote:
> 
>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matthias@confluent.io> wrote:
>>
>> I am not sure if I understand the use case correctly. Could you give
>> some more context?
> 
> Happily, thanks for thinking about this!
> 
>>
>>> backing store whose partitioning is value dependent
>>
>> In infer that you are using a custom store and not default RocksDB? If
>> yes, what do you use? What does "value dependent" mean in this context?
> 
> We're currently using the base in memory store.  We tried to use RocksDB
> but the tuning to get it running appropriately in a Linux container without
> tripping the cgroups OOM killer is nontrivial.
> 
> 
>> Right now, I am wondering, why you not just set a new key to get your
>> data grouped by the field you are interesting in? Also, if you don't
>> partitioned your data by key, you might break your streams application
>> with regard to fault-tolerance -- or does your custom store not rely on
>> changelog backup for fault-tolerance?
>>
> 
> That's an interesting point about making transformed key.  But I don't think
> it simplifies my problem too much.  Essentially, I have a list of messages
> that should get delivered to destinations.  Each message has a primary key K
> and a destination D.
> 
> We partition over D so that all messages to the same destination are handled by
> the same worker, to preserve ordering and implement local rate limits etc.
> 
> I want to preserve the illusion to the client that they can look up a key with
> only K.  So, as an intermediate step, we use the GlobalKTable to look up D.  Once
> we have K,D we can then compute the partition and execute a lookup.
> 
> Transforming the key to be a composite K,D isn't helpful because the end user still
> only knows K -- D's relevance is an implementation detail I wish to hide -- so you still
> need some sort of secondary lookup.
> 
> We do use the changelog backup for fault tolerance -- how would having the partition
> based on the value break this?  Is the changelog implicitly partitioned by a partitioner
> other than the one we give to the topology?
> 
> Hopefully that explains my situation a bit more?  Thanks!
> 
>>
>> -Matthias
>>
>>
>>
>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>>> I have a KTable and backing store whose partitioning is value dependent.
>>> I want certain groups of messages to be ordered and that grouping is determined
>>> by one field (D) of the (possibly large) value.
>>>
>>> When I lookup by only K, obviously you don't know the partition it should be
on.
>>> So I will build a GlobalKTable of K -> D.  This gives me enough information
>>> to determine the partition.
>>>
>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use case well.
>>> It allows you to either get all metadata, or by key -- but if you lookup by key
>>> it just substitutes a null value (causing a downstream NPE)
>>>
>>> I can iterate over all metadata and compute the mapping of K -> K,D ->
P
>>> and then iterate over all metadata looking for P.  It's not difficult but ends
>>> up being a bit of somewhat ugly code that feels like I shouldn't have to write
it.
>>>
>>> Am I missing something here?  Is there a better way that I've missed?  Thanks!
>>>
>>
> 


Mime
View raw message