kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steven Schlansker <sschlans...@opentable.com>
Subject Re: Finding StreamsMetadata with value-dependent partitioning
Date Wed, 07 Jun 2017 16:40:58 GMT
Thank you for the idea, I'll keep that in mind if I run into limitations of
my current approach.

> On Jun 6, 2017, at 5:50 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
> 
> Thanks Steven, interesting use case.
> 
> The current streams state store metadata discovery is assuming the
> `DefaultStreamPartitioner` is used, which is a limitation for such cases.
> 
> Another workaround that I can think of is, that you can first partition on
> D in the first stage to let the workers to the "real" work, then you can
> pipe it to a second stage where you re-partition on K, and the second
> processor is only for materializing the store for querying. I'm not sure if
> it would be better since it may require doubling the store spaces (one on
> the first processor and one on the second), and since you can hold the
> whole K -> D map in a global state it seems this map is small enough so
> maybe not worth the repartitioning.
> 
> 
> Guozhang
> 
> 
> 
> 
> 
> 
> On Tue, Jun 6, 2017 at 8:36 AM, Michael Noll <michael@confluent.io> wrote:
> 
>> Happy to hear you found a working solution, Steven!
>> 
>> -Michael
>> 
>> 
>> 
>> On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
>> sschlansker@opentable.com> wrote:
>> 
>>>> 
>>>> On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>> 
>>>> Thanks. That helps to understand the use case better.
>>>> 
>>>> Rephrase to make sure I understood it correctly:
>>>> 
>>>> 1) you are providing a custom partitioner to Streams that is base on
>> one
>>>> field in your value (that's fine with regard to fault-tolerance :))
>>>> 2) you want to use interactive queries to query the store
>>>> 3) because of your custom partitioning schema, you need to manually
>>>> figure out the right application instance that hosts a key
>>>> 4) thus, you use a GlobalKTable to maintain the information from K to D
>>>> and thus to the partition ie, streams instance that hosts K
>>>> 
>>>> If this is correct, than you cannot use the "by key" metadata
>> interface.
>>>> It's designed to find the streams instance base in the key only -- but
>>>> your partitioner is based on the value. Internally, we call
>>>> 
>>>>> final Integer partition = partitioner.partition(key, null,
>>> sourceTopicsInfo.maxPartitions);
>>>> 
>>>> Note, that `value==null` -- at this point, we don't have any value
>>>> available and can't provide it to the partitioner.
>>>> 
>>>> Thus, your approach to get all metadata is the only way you can go.
>>> 
>>> Thanks for confirming this.  The code is a little ugly but I've done
>> worse
>>> :)
>>> 
>>>> 
>>>> 
>>>> Very interesting (and quite special) use case. :)
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> On 6/2/17 2:32 PM, Steven Schlansker wrote:
>>>>> 
>>>>>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matthias@confluent.io>
>>> wrote:
>>>>>> 
>>>>>> I am not sure if I understand the use case correctly. Could you give
>>>>>> some more context?
>>>>> 
>>>>> Happily, thanks for thinking about this!
>>>>> 
>>>>>> 
>>>>>>> backing store whose partitioning is value dependent
>>>>>> 
>>>>>> In infer that you are using a custom store and not default RocksDB?
>> If
>>>>>> yes, what do you use? What does "value dependent" mean in this
>> context?
>>>>> 
>>>>> We're currently using the base in memory store.  We tried to use
>> RocksDB
>>>>> but the tuning to get it running appropriately in a Linux container
>>> without
>>>>> tripping the cgroups OOM killer is nontrivial.
>>>>> 
>>>>> 
>>>>>> Right now, I am wondering, why you not just set a new key to get
your
>>>>>> data grouped by the field you are interesting in? Also, if you don't
>>>>>> partitioned your data by key, you might break your streams
>> application
>>>>>> with regard to fault-tolerance -- or does your custom store not rely
>> on
>>>>>> changelog backup for fault-tolerance?
>>>>>> 
>>>>> 
>>>>> That's an interesting point about making transformed key.  But I don't
>>> think
>>>>> it simplifies my problem too much.  Essentially, I have a list of
>>> messages
>>>>> that should get delivered to destinations.  Each message has a primary
>>> key K
>>>>> and a destination D.
>>>>> 
>>>>> We partition over D so that all messages to the same destination are
>>> handled by
>>>>> the same worker, to preserve ordering and implement local rate limits
>>> etc.
>>>>> 
>>>>> I want to preserve the illusion to the client that they can look up a
>>> key with
>>>>> only K.  So, as an intermediate step, we use the GlobalKTable to look
>>> up D.  Once
>>>>> we have K,D we can then compute the partition and execute a lookup.
>>>>> 
>>>>> Transforming the key to be a composite K,D isn't helpful because the
>>> end user still
>>>>> only knows K -- D's relevance is an implementation detail I wish to
>>> hide -- so you still
>>>>> need some sort of secondary lookup.
>>>>> 
>>>>> We do use the changelog backup for fault tolerance -- how would having
>>> the partition
>>>>> based on the value break this?  Is the changelog implicitly
>> partitioned
>>> by a partitioner
>>>>> other than the one we give to the topology?
>>>>> 
>>>>> Hopefully that explains my situation a bit more?  Thanks!
>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
>>>>>>> I have a KTable and backing store whose partitioning is value
>>> dependent.
>>>>>>> I want certain groups of messages to be ordered and that grouping
is
>>> determined
>>>>>>> by one field (D) of the (possibly large) value.
>>>>>>> 
>>>>>>> When I lookup by only K, obviously you don't know the partition
it
>>> should be on.
>>>>>>> So I will build a GlobalKTable of K -> D.  This gives me enough
>>> information
>>>>>>> to determine the partition.
>>>>>>> 
>>>>>>> Unfortunately, the KafkaStreams metadata API doesn't fit this
use
>>> case well.
>>>>>>> It allows you to either get all metadata, or by key -- but if
you
>>> lookup by key
>>>>>>> it just substitutes a null value (causing a downstream NPE)
>>>>>>> 
>>>>>>> I can iterate over all metadata and compute the mapping of K
-> K,D
>>> -> P
>>>>>>> and then iterate over all metadata looking for P.  It's not
>> difficult
>>> but ends
>>>>>>> up being a bit of somewhat ugly code that feels like I shouldn't
>> have
>>> to write it.
>>>>>>> 
>>>>>>> Am I missing something here?  Is there a better way that I've
>>> missed?  Thanks!
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
> 
> 
> 
> --
> -- Guozhang


Mime
View raw message