kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: Finding StreamsMetadata with value-dependent partitioning
Date Tue, 06 Jun 2017 15:36:03 GMT
Happy to hear you found a working solution, Steven!

-Michael



On Sat, Jun 3, 2017 at 12:53 AM, Steven Schlansker <
sschlansker@opentable.com> wrote:

> >
> > On Jun 2, 2017, at 3:32 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> >
> > Thanks. That helps to understand the use case better.
> >
> > Rephrase to make sure I understood it correctly:
> >
> > 1) you are providing a custom partitioner to Streams that is base on one
> > field in your value (that's fine with regard to fault-tolerance :))
> > 2) you want to use interactive queries to query the store
> > 3) because of your custom partitioning schema, you need to manually
> > figure out the right application instance that hosts a key
> > 4) thus, you use a GlobalKTable to maintain the information from K to D
> > and thus to the partition ie, streams instance that hosts K
> >
> > If this is correct, than you cannot use the "by key" metadata interface.
> > It's designed to find the streams instance base in the key only -- but
> > your partitioner is based on the value. Internally, we call
> >
> >> final Integer partition = partitioner.partition(key, null,
> sourceTopicsInfo.maxPartitions);
> >
> > Note, that `value==null` -- at this point, we don't have any value
> > available and can't provide it to the partitioner.
> >
> > Thus, your approach to get all metadata is the only way you can go.
>
> Thanks for confirming this.  The code is a little ugly but I've done worse
> :)
>
> >
> >
> > Very interesting (and quite special) use case. :)
> >
> >
> > -Matthias
> >
> > On 6/2/17 2:32 PM, Steven Schlansker wrote:
> >>
> >>> On Jun 2, 2017, at 2:11 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> >>>
> >>> I am not sure if I understand the use case correctly. Could you give
> >>> some more context?
> >>
> >> Happily, thanks for thinking about this!
> >>
> >>>
> >>>> backing store whose partitioning is value dependent
> >>>
> >>> In infer that you are using a custom store and not default RocksDB? If
> >>> yes, what do you use? What does "value dependent" mean in this context?
> >>
> >> We're currently using the base in memory store.  We tried to use RocksDB
> >> but the tuning to get it running appropriately in a Linux container
> without
> >> tripping the cgroups OOM killer is nontrivial.
> >>
> >>
> >>> Right now, I am wondering, why you not just set a new key to get your
> >>> data grouped by the field you are interesting in? Also, if you don't
> >>> partitioned your data by key, you might break your streams application
> >>> with regard to fault-tolerance -- or does your custom store not rely on
> >>> changelog backup for fault-tolerance?
> >>>
> >>
> >> That's an interesting point about making transformed key.  But I don't
> think
> >> it simplifies my problem too much.  Essentially, I have a list of
> messages
> >> that should get delivered to destinations.  Each message has a primary
> key K
> >> and a destination D.
> >>
> >> We partition over D so that all messages to the same destination are
> handled by
> >> the same worker, to preserve ordering and implement local rate limits
> etc.
> >>
> >> I want to preserve the illusion to the client that they can look up a
> key with
> >> only K.  So, as an intermediate step, we use the GlobalKTable to look
> up D.  Once
> >> we have K,D we can then compute the partition and execute a lookup.
> >>
> >> Transforming the key to be a composite K,D isn't helpful because the
> end user still
> >> only knows K -- D's relevance is an implementation detail I wish to
> hide -- so you still
> >> need some sort of secondary lookup.
> >>
> >> We do use the changelog backup for fault tolerance -- how would having
> the partition
> >> based on the value break this?  Is the changelog implicitly partitioned
> by a partitioner
> >> other than the one we give to the topology?
> >>
> >> Hopefully that explains my situation a bit more?  Thanks!
> >>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 6/2/17 10:34 AM, Steven Schlansker wrote:
> >>>> I have a KTable and backing store whose partitioning is value
> dependent.
> >>>> I want certain groups of messages to be ordered and that grouping is
> determined
> >>>> by one field (D) of the (possibly large) value.
> >>>>
> >>>> When I lookup by only K, obviously you don't know the partition it
> should be on.
> >>>> So I will build a GlobalKTable of K -> D.  This gives me enough
> information
> >>>> to determine the partition.
> >>>>
> >>>> Unfortunately, the KafkaStreams metadata API doesn't fit this use
> case well.
> >>>> It allows you to either get all metadata, or by key -- but if you
> lookup by key
> >>>> it just substitutes a null value (causing a downstream NPE)
> >>>>
> >>>> I can iterate over all metadata and compute the mapping of K -> K,D
> -> P
> >>>> and then iterate over all metadata looking for P.  It's not difficult
> but ends
> >>>> up being a bit of somewhat ugly code that feels like I shouldn't have
> to write it.
> >>>>
> >>>> Am I missing something here?  Is there a better way that I've
> missed?  Thanks!
> >>>>
> >>>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message