kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raman Gupta <rocketra...@gmail.com>
Subject Re: Race condition with stream use of Global KTable
Date Fri, 12 Apr 2019 21:22:04 GMT
Currently topic-1 and topic-2 have a different number of partitions
(due to vastly different concurrency/processing time requirements). So
in order to accomplish this, I'd also need to open a can of worms and
repartition topic-1 to create topic-1a, so that it can be
co-partitioned with topic-2a, which would have to be rebuilt from
topic-2.

This is a heck of a lot of hoops to jump through, and in any case,
even if I did this, I'm still unclear on how this actually solves the
race condition (question #3 in my previous message)?

Regards,
Raman

On Fri, Apr 12, 2019 at 1:57 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> As for 2), just to clarify that co-partitioning is still needed to make
> sure that a record from topic-2 can get the expected data from the local
> materialized store from topic-1 --- this would be required for either DSL
> or for processor API.
>
> What I was suggesting is that, when sending to topic-2, by default it will
> be partitioned on the key of the message, and hence it would not likely be
> co-partitioned. However, you can use `StreamPartitioner` on either
> stream.to(Produced.withStreamPartitioner)
> for DSL users, or directly topology.addSinkNode(... StreamPartitioner) for
> Processor users, to override the default partitioning key from the message
> key to scheme that is consistent with the key of topic-1. Then we can
> guarantee they are co-partitioned.
>
> Guozhang
>
> On Tue, Apr 9, 2019 at 7:59 PM Raman Gupta <rocketraman@gmail.com> wrote:
>
> > Hmm, not sure I understand what you are suggesting. Let me address
> > each step in turn:
> >
> > > 1) materialize the topic-1 into a state store
> >
> > Ok, I think that's basically what we have with the global k-table I
> > showed in the topology, or did you mean something else, like using the
> > Processor API to populate our own state store?
> >
> > > 2) use the same key as the partitioning key when writing to topic-2 to
> > make sure it is co-partitioned (unless the resulted
> > stream from topic-2 does not need to rely on it being partitioned by
> > the key to perform other operations, it is okay)
> >
> > Our situation is the latter. The key in topic-2 is essentially a
> > unique id, and partitioning should therefore be pretty random. Also
> > remember that the cardinality of topic-1 to topic-2 is 1:n, so there
> > are multiple different keys on topic-2 for each key in topic-1.
> >
> > > 3) we can just issue `get` as part of the lower-level processor API
> > rather than performing a join to query the materialized table from topic-1.
> >
> > Right now, we're already doing a `get` on the global k-table as part
> > of the DSL API. Does it make a difference whether we use the Processor
> > API? How does that resolve the race condition?
> >
> > Regards,
> > Raman
> >
> >
> >
> > On Fri, Apr 5, 2019 at 12:00 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> > >
> > > I see.
> > >
> > > So back to your original question, yes there will be a race condition
> > since
> > > the global ktable is updated with a separate thread other than the one
> > > which is reading from topic-2 and process the record and query the global
> > > ktable.
> > >
> > > What I can think of on top of my head, is to 1) materialize the topic-1
> > > into a state store, and 2) use the same key as the partitioning key when
> > > writing to topic-2 to make sure it is co-partitioned (unless the resulted
> > > stream from topic-2 does not need to rely on it being partitioned by the
> > > key to perform other operations, it is okay), and then 3) we can just
> > issue
> > > `get` as part of the lower-level processor API rather than performing a
> > > join to query the materialized table from topic-1. Does that make sense?
> > >
> > > Guozhang
> > >
> > > On Thu, Apr 4, 2019 at 7:38 AM Raman Gupta <rocketraman@gmail.com>
> > wrote:
> > >
> > > > Yes, the stream transformation of `topic-1` to `topic-2` is a
> > > > heavyweight operation producing completely different information on
> > > > topic-2 than is contained on topic-1 (the cardinality is 1-n as well,
> > > > not 1-1). The schema evolution I am attempting to perform should have
> > > > captured the data at time of write of topic-2 but didn't. It is easily
> > > > available in topic-1 though, using some other information in the
> > > > payload of topic-2.
> > > >
> > > > Regards,
> > > > Raman
> > > >
> > > > On Thu, Apr 4, 2019 at 12:57 AM Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > > > >
> > > > > Hi Raman,
> > > > >
> > > > > What I'm not clear is that since topic-2 is a transformed topic of
> > > > topic-1
> > > > > via "other stream", then why do you still need to join it with
> > topic-1?
> > > > Or
> > > > > in other words, are topic-1 and topic-2 containing different data,
or
> > > > > topic-2 is just storing similar data of topic-1 but just in different
> > > > > format (since it was a transformation result of topic-1 via "other
> > > > stream")?
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Apr 2, 2019 at 4:07 PM Raman Gupta <rocketraman@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Yes, I forgot to show an item on the topology:
> > > > > >
> > > > > >    +-----------> global-ktable +---------+
> > > > > >    |                                     |
> > > > > >    +                                     v
> > > > > > topic-1                                stream +----> topic-3
> > > > > >    +                                     ^
> > > > > >    |                                     |
> > > > > >    +----> other stream +--> topic-2 +----+
> > > > > >
> > > > > > My use case is a "schema evolution" of the data in topic-2,
to
> > produce
> > > > > > topic-3 via "stream". In order to perform this schema evolution,
I
> > > > > > need to pull some attributes from the payloads in topic-1. I
can't
> > > > > > simply join topic-1 and topic-2 because they do not share keys.
The
> > > > > > global-ktable allows me to easily look up the values I need
from
> > > > > > topic-1 using an attribute from the payload of topic-2, and
combine
> > > > > > those to write to topic-3.
> > > > > >
> > > > > > Regards,
> > > > > > Raman
> > > > > >
> > > > > > On Tue, Apr 2, 2019 at 6:56 PM Guozhang Wang <wangguoz@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > Hello Raman,
> > > > > > >
> > > > > > > It seems from your case that `topic-1` is used for both
the
> > global
> > > > ktable
> > > > > > > as well as another stream, which then be transformed to
a new
> > stream
> > > > that
> > > > > > > will be "joined" somehow with the global ktable. Could
you
> > elaborate
> > > > your
> > > > > > > case a bit more on why do you want to use the same source
topic
> > for
> > > > two
> > > > > > > entities in your topology?
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Tue, Apr 2, 2019 at 3:41 PM Raman Gupta <
> > rocketraman@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > I have a topology like this:
> > > > > > > >
> > > > > > > >    +-----------> global-ktable +---------+
> > > > > > > >    |                                     |
> > > > > > > >    +                                     v
> > > > > > > > topic-1                                stream
> > > > > > > >    +                                     ^
> > > > > > > >    |                                     |
> > > > > > > >    +----> other stream +--> topic-2 +----+
> > > > > > > >
> > > > > > > > IOW, a global ktable is built from topic-1. Meanwhile,
"other
> > > > stream"
> > > > > > > > transforms topic-1 to topic-2. Finally, "stream" operators
on
> > > > topic-2,
> > > > > > > > and as part of its logic, reads data from "global-ktable".
> > > > > > > >
> > > > > > > > I am worried about the race condition present in "stream"
> > between
> > > > the
> > > > > > > > message showing up on topic-2, and the "get" from
> > "global-ktable".
> > > > Is
> > > > > > > > there a way, other than retrying the `get`, to avoid
this race?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Raman
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
> --
> -- Guozhang

Mime
View raw message