kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Fwd: [DISCUSS] KIP-114: KTable materialization and improved semantics
Date Fri, 27 Jan 2017 15:51:42 GMT
I think Jan is saying that they don't always need to be materialized, i.e.,
filter just needs to apply the ValueGetter, it doesn't need yet another
physical state store.

On Fri, 27 Jan 2017 at 15:49 Michael Noll <michael@confluent.io> wrote:

> Like Damian, and for the same reasons, I am more in favor of overloading
> methods rather than introducing `materialize()`.
> FWIW, we already have a similar API setup for e.g.
> `KTable#through(topicName, stateStoreName)`.
>
> A related but slightly different question is what e.g. Jan Filipiak
> mentioned earlier in this thread:
> I think we need to explain more clearly why KIP-114 doesn't propose the
> seemingly simpler solution of always materializing tables/state stores.
>
>
>
> On Fri, Jan 27, 2017 at 4:38 PM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
> > Hi,
> >
> > Yeah its confusing, Why shoudn't it be querable by IQ? If you uses the
> > ValueGetter of Filter it will apply the filter and should be completely
> > transparent as to if another processor or IQ is accessing it? How can
> this
> > new method help?
> >
> > I cannot see the reason for the additional materialize method being
> > required! Hence I suggest leave it alone.
> > regarding removing the others I dont have strong opinions and it seems to
> > be unrelated.
> >
> > Best Jan
> >
> >
> >
> >
> > On 26.01.2017 20:48, Eno Thereska wrote:
> >
> >> Forwarding this thread to the users list too in case people would like
> to
> >> comment. It is also on the dev list.
> >>
> >> Thanks
> >> Eno
> >>
> >> Begin forwarded message:
> >>>
> >>> From: "Matthias J. Sax" <matthias@confluent.io>
> >>> Subject: Re: [DISCUSS] KIP-114: KTable materialization and improved
> >>> semantics
> >>> Date: 24 January 2017 at 19:30:10 GMT
> >>> To: dev@kafka.apache.org
> >>> Reply-To: dev@kafka.apache.org
> >>>
> >>> That not what I meant by "huge impact".
> >>>
> >>> I refer to the actions related to materialize a KTable: creating a
> >>> RocksDB store and a changelog topic -- users should be aware about
> >>> runtime implication and this is better expressed by an explicit method
> >>> call, rather than implicitly triggered by using a different overload of
> >>> a method.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/24/17 1:35 AM, Damian Guy wrote:
> >>>
> >>>> I think your definition of a huge impact and mine are rather different
> >>>> ;-P
> >>>> Overloading a few methods  is not really a huge impact IMO. It is
> also a
> >>>> sacrifice worth making for readability, usability of the API.
> >>>>
> >>>> On Mon, 23 Jan 2017 at 17:55 Matthias J. Sax <matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>> I understand your argument, but do not agree with it.
> >>>>>
> >>>>> Your first version (even if the "flow" is not as nice) is more
> explicit
> >>>>> than the second version. Adding a stateStoreName parameter is quite
> >>>>> implicit but has a huge impact -- thus, I prefer the rather more
> >>>>> verbose
> >>>>> but explicit version.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 1/23/17 1:39 AM, Damian Guy wrote:
> >>>>>
> >>>>>> I'm not a fan of materialize. I think it interrupts the flow,
i.e,
> >>>>>>
> >>>>>> table.mapValue(..).materialize().join(..).materialize()
> >>>>>> compared to:
> >>>>>> table.mapValues(..).join(..)
> >>>>>>
> >>>>>> I know which one i prefer.
> >>>>>> My preference is stil to provide overloaded methods where people
can
> >>>>>> specify the store names if they want, otherwise we just generate
> them.
> >>>>>>
> >>>>>> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matthias@confluent.io
> >
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>>
> >>>>>>> thanks for the KIP Eno! Here are my 2 cents:
> >>>>>>>
> >>>>>>> 1) I like Guozhang's proposal about removing store name
from all
> >>>>>>> KTable
> >>>>>>> methods and generate internal names (however, I would do
this as
> >>>>>>> overloads). Furthermore, I would not force users to call
> >>>>>>> .materialize()
> >>>>>>> if they want to query a store, but add one more method
> >>>>>>> .stateStoreName()
> >>>>>>> that returns the store name if the KTable is materialized.
Thus,
> also
> >>>>>>> .materialize() must not necessarily have a parameter storeName
(ie,
> >>>>>>> we
> >>>>>>> should have some overloads here).
> >>>>>>>
> >>>>>>> I would also not allow to provide a null store name (to
indicate no
> >>>>>>> materialization if not necessary) but throw an exception.
> >>>>>>>
> >>>>>>> This yields some simplification (see below).
> >>>>>>>
> >>>>>>>
> >>>>>>> 2) I also like Guozhang's proposal about KStream#toTable()
> >>>>>>>
> >>>>>>>
> >>>>>>> 3)
> >>>>>>>
> >>>>>>>>   3. What will happen when you call materialize on KTable
that is
> >>>>>>>>>
> >>>>>>>> already
> >>>>>>>
> >>>>>>>>   materialized? Will it create another StateStore (providing
the
> >>>>>>>>> name
> >>>>>>>>>
> >>>>>>>> is
> >>>>>
> >>>>>>   different), throw an Exception?
> >>>>>>>>>
> >>>>>>>> Currently an exception is thrown, but see below.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> If we follow approach (1) from Guozhang, there is no
need to worry
> >>>>>>> about
> >>>>>>> a second materialization and also no exception must be throws.
A
> >>>>>>> call to
> >>>>>>> .materialize() basically sets a "materialized flag" (ie,
idempotent
> >>>>>>> operation) and sets a new name.
> >>>>>>>
> >>>>>>>
> >>>>>>> 4)
> >>>>>>>
> >>>>>>>> Rename toStream() to toKStream() for consistency.
> >>>>>>>>>
> >>>>>>>> Not sure whether that is really required. We also use
> >>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`,
for
> example,
> >>>>>>>>
> >>>>>>> and
> >>>>>
> >>>>>> don't care about the "K" prefix.
> >>>>>>>>
> >>>>>>> Eno's reply:
> >>>>>>>
> >>>>>>>> I think changing it to `toKStream` would make it absolutely
clear
> >>>>>>>> what
> >>>>>>>>
> >>>>>>> we are converting it to.
> >>>>>>>
> >>>>>>>> I'd say we should probably change the KStreamBuilder
methods (but
> >>>>>>>> not
> >>>>>>>>
> >>>>>>> in
> >>>>>
> >>>>>> this KIP).
> >>>>>>>
> >>>>>>> I would keep #toStream(). (see below)
> >>>>>>>
> >>>>>>>
> >>>>>>> 5) We should not remove any methods but only deprecate them.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> A general note:
> >>>>>>>
> >>>>>>> I do not understand your comments "Rejected Alternatives".
You say
> >>>>>>> "Have
> >>>>>>> the KTable be the materialized view" was rejected. But your
KIP
> >>>>>>> actually
> >>>>>>> does exactly this -- the changelog abstraction of KTable
is
> secondary
> >>>>>>> after those changes and the "view" abstraction is what a
KTable is.
> >>>>>>> And
> >>>>>>> just to be clear, I like this a lot:
> >>>>>>>
> >>>>>>> - it aligns with the name KTable
> >>>>>>> - is aligns with stream-table-duality
> >>>>>>> - it aligns with IQ
> >>>>>>>
> >>>>>>> I would say that a KTable is a "view abstraction" (as
> >>>>>>> materialization is
> >>>>>>> optional).
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
> >>>>>>>
> >>>>>>>> Thanks for the KIP Eno, I have a few meta comments and
a few
> >>>>>>>> detailed
> >>>>>>>> comments:
> >>>>>>>>
> >>>>>>>> 1. I like the materialize() function in general, but
I would like
> to
> >>>>>>>>
> >>>>>>> see
> >>>>>
> >>>>>> how other KTable functions should be updated accordingly. For
> example,
> >>>>>>>>
> >>>>>>> 1)
> >>>>>
> >>>>>> KStreamBuilder.table(..) has a state store name parameter, and
we
> will
> >>>>>>>> always materialize the KTable unless its state store
name is set
> to
> >>>>>>>>
> >>>>>>> null;
> >>>>>
> >>>>>> 2) KTable.agg requires the result KTable to be materialized,
and
> hence
> >>>>>>>>
> >>>>>>> it
> >>>>>
> >>>>>> also have a state store name; 3) KTable.join requires the joining
> >>>>>>>> table
> >>>>>>>>
> >>>>>>> to
> >>>>>>>
> >>>>>>>> be materialized. And today we do not actually have a
mechanism to
> >>>>>>>>
> >>>>>>> enforce
> >>>>>
> >>>>>> that, but will only throw an exception at runtime if it is not
(e.g.
> >>>>>>>> if
> >>>>>>>>
> >>>>>>> you
> >>>>>>>
> >>>>>>>> have "builder.table("topic", null).join()" a RTE will
be thrown).
> >>>>>>>>
> >>>>>>>> I'd make an extended proposal just to kick off the discussion
> here:
> >>>>>>>>
> >>>>>>> let's
> >>>>>
> >>>>>> remove all the state store params in other KTable functions,
and if
> in
> >>>>>>>>
> >>>>>>> some
> >>>>>>>
> >>>>>>>> cases KTable have to be materialized (e.g. KTable resulted
from
> >>>>>>>>
> >>>>>>> KXX.agg)
> >>>>>
> >>>>>> and users do not call materialize(), then we treat it as "users
are
> >>>>>>>> not
> >>>>>>>> interested in querying it at all" and hence use an internal
name
> >>>>>>>>
> >>>>>>> generated
> >>>>>>>
> >>>>>>>> for the materialized KTable; i.e. although it is materialized
the
> >>>>>>>> state
> >>>>>>>> store is not exposed to users. And if users call materialize()
> >>>>>>>>
> >>>>>>> afterwards
> >>>>>
> >>>>>> but we have already decided to materialize it, we can replace
the
> >>>>>>>>
> >>>>>>> internal
> >>>>>>>
> >>>>>>>> name with the user's provided names. Then from a user's
> point-view,
> >>>>>>>> if
> >>>>>>>>
> >>>>>>> they
> >>>>>>>
> >>>>>>>> ever want to query a KTable, they have to call materialize()
with
> a
> >>>>>>>>
> >>>>>>> given
> >>>>>
> >>>>>> state store name. This approach has one awkwardness though,
that
> >>>>>>>> serdes
> >>>>>>>>
> >>>>>>> and
> >>>>>>>
> >>>>>>>> state store names param are not separated and could
be overlapped
> >>>>>>>> (see
> >>>>>>>> detailed comment #2 below).
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 2. This step does not need to be included in this KIP,
but just
> as a
> >>>>>>>> reference / future work: as we have discussed before,
we may
> enforce
> >>>>>>>> materialize KTable.join resulted KTables as well in
the future. If
> >>>>>>>> we
> >>>>>>>>
> >>>>>>> do
> >>>>>
> >>>>>> that, then:
> >>>>>>>>
> >>>>>>>> a) KXX.agg resulted KTables are always materialized;
> >>>>>>>> b) KTable.agg requires the aggregating KTable to always
be
> >>>>>>>> materialized
> >>>>>>>> (otherwise we would not know the old value);
> >>>>>>>> c) KTable.join resulted KTables are always materialized,
and so
> are
> >>>>>>>> the
> >>>>>>>> joining KTables to always be materialized.
> >>>>>>>> d) KTable.filter/mapValues resulted KTables materialization
depend
> >>>>>>>> on
> >>>>>>>>
> >>>>>>> its
> >>>>>
> >>>>>> parent's materialization;
> >>>>>>>>
> >>>>>>>> By recursive induction all KTables are actually always
> materialized,
> >>>>>>>>
> >>>>>>> and
> >>>>>
> >>>>>> then the effect of the "materialize()" is just for specifying
the
> >>>>>>>> state
> >>>>>>>> store names. In this scenario, we do not need to send
Change<V> in
> >>>>>>>> repartition topics within joins any more, but only for
> repartitions
> >>>>>>>>
> >>>>>>> topics
> >>>>>>>
> >>>>>>>> within aggregations. Instead, we can just send a "tombstone"
> without
> >>>>>>>>
> >>>>>>> the
> >>>>>
> >>>>>> old value and we do not need to calculate joins twice (one more
time
> >>>>>>>>
> >>>>>>> when
> >>>>>
> >>>>>> old value is received).
> >>>>>>>>
> >>>>>>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()"
> >>>>>>>>
> >>>>>>> function
> >>>>>>>
> >>>>>>>> which is interpreted as a dummy-aggregation where the
new value
> >>>>>>>> always
> >>>>>>>> replaces the old value. I have seen a couple of use
cases of this,
> >>>>>>>> for
> >>>>>>>> example, users want to read a changelog topic, apply
some filters,
> >>>>>>>> and
> >>>>>>>>
> >>>>>>> then
> >>>>>>>
> >>>>>>>> materialize it into a KTable with state stores without
creating
> >>>>>>>>
> >>>>>>> duplicated
> >>>>>>>
> >>>>>>>> changelog topics. With materialize() and toTable I'd
imagine users
> >>>>>>>> can
> >>>>>>>> specify sth. like:
> >>>>>>>>
> >>>>>>>> "
> >>>>>>>> KStream stream = builder.stream("topic1").filter(..);
> >>>>>>>> KTable table = stream.toTable(..);
> >>>>>>>> table.materialize("state1");
> >>>>>>>> "
> >>>>>>>>
> >>>>>>>> And the library in this case could set store "state1"
's changelog
> >>>>>>>>
> >>>>>>> topic
> >>>>>
> >>>>>> to
> >>>>>>>
> >>>>>>>> be "topic1", and applying the filter on the fly while
(re-)storing
> >>>>>>>> its
> >>>>>>>> state by reading from this topic, instead of creating
a second
> >>>>>>>>
> >>>>>>> changelog
> >>>>>
> >>>>>> topic like "appID-state1-changelog" which is a semi-duplicate
of
> >>>>>>>>
> >>>>>>> "topic1".
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Detailed:
> >>>>>>>>
> >>>>>>>> 1. I'm +1 with Michael regarding "#toStream"; actually
I was
> >>>>>>>> thinking
> >>>>>>>>
> >>>>>>> about
> >>>>>>>
> >>>>>>>> renaming to "#toChangeLog" but after thinking a bit
more I think
> >>>>>>>>
> >>>>>>> #toStream
> >>>>>>>
> >>>>>>>> is still better, and we can just mention in the javaDoc
that it is
> >>>>>>>> transforming its underlying changelog stream to a normal
stream.
> >>>>>>>> 2. As Damian mentioned, there are a few scenarios where
the serdes
> >>>>>>>> are
> >>>>>>>> already specified in a previous operation whereas it
is not known
> >>>>>>>>
> >>>>>>> before
> >>>>>
> >>>>>> calling materialize, for example:
> >>>>>>>> stream.groupByKey.agg(serde).materialize(serde) v.s.
> >>>>>>>>
> >>>>>>> table.mapValues(/*no
> >>>>>
> >>>>>> serde specified*/).materialize(serde). We need to specify what
are
> >>>>>>>> the
> >>>>>>>> handling logic here.
> >>>>>>>> 3. We can remove "KTable#to" call as well, and enforce
users to
> >>>>>>>> call "
> >>>>>>>> KTable.toStream.to" to be more clear.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <
> >>>>>>>> eno.thereska@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> I think changing it to `toKStream` would make it absolutely
clear
> >>>>>>>>> what
> >>>>>>>>>
> >>>>>>>> we
> >>>>>>>
> >>>>>>>> are converting it to.
> >>>>>>>>>
> >>>>>>>>> I'd say we should probably change the KStreamBuilder
methods (but
> >>>>>>>>> not
> >>>>>>>>>
> >>>>>>>> in
> >>>>>
> >>>>>> this KIP).
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Eno
> >>>>>>>>>
> >>>>>>>>> On 17 Jan 2017, at 13:59, Michael Noll <michael@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Rename toStream() to toKStream() for consistency.
> >>>>>>>>>>>
> >>>>>>>>>> Not sure whether that is really required. We
also use
> >>>>>>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`,
for
> >>>>>>>>>> example,
> >>>>>>>>>>
> >>>>>>>>> and
> >>>>>>>
> >>>>>>>> don't care about the "K" prefix.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska
<
> >>>>>>>>>>
> >>>>>>>>> eno.thereska@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks Damian, answers inline:
> >>>>>>>>>>>
> >>>>>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian.guy@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Eno,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP. Some comments:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. I'd probably rename materialized
to materialize.
> >>>>>>>>>>>>
> >>>>>>>>>>> Ok.
> >>>>>>>>>>>
> >>>>>>>>>>> 2. I don't think the addition of the new
Log compaction
> mechanism
> >>>>>>>>>>>>
> >>>>>>>>>>> is
> >>>>>
> >>>>>> necessary for this KIP, i.e, the KIP is useful without it. Maybe
> >>>>>>>>>>>>
> >>>>>>>>>>> that
> >>>>>>>
> >>>>>>>> should be a different KIP?
> >>>>>>>>>>>>
> >>>>>>>>>>> Agreed, already removed. Will do a separate
KIP for that.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 3. What will happen when you call materialize
on KTable that is
> >>>>>>>>>>>>
> >>>>>>>>>>> already
> >>>>>>>>>
> >>>>>>>>>> materialized? Will it create another StateStore
(providing the
> >>>>>>>>>>>>
> >>>>>>>>>>> name
> >>>>>
> >>>>>> is
> >>>>>>>
> >>>>>>>> different), throw an Exception?
> >>>>>>>>>>>>
> >>>>>>>>>>> Currently an exception is thrown, but see
below.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 4. Have you considered overloading the existing
KTable
> operations
> >>>>>>>>>>>>
> >>>>>>>>>>> to
> >>>>>
> >>>>>> add
> >>>>>>>>>>>
> >>>>>>>>>>>> a state store name? So if a state store
name is provided, then
> >>>>>>>>>>>>
> >>>>>>>>>>> materialize
> >>>>>>>>>>>
> >>>>>>>>>>>> a state store? This would be my preferred
approach as i don't
> >>>>>>>>>>>>
> >>>>>>>>>>> think
> >>>>>
> >>>>>> materialize is always a valid operation.
> >>>>>>>>>>>>
> >>>>>>>>>>> Ok I can see your point. This will increase
the KIP size since
> >>>>>>>>>>> I'll
> >>>>>>>>>>>
> >>>>>>>>>> need
> >>>>>>>
> >>>>>>>> to enumerate all overloaded methods, but it's not a
problem.
> >>>>>>>>>>>
> >>>>>>>>>>> 5. The materialize method will need ta value
Serde as some
> >>>>>>>>>>>>
> >>>>>>>>>>> operations,
> >>>>>>>
> >>>>>>>> i.e., mapValues, join etc can change the value types
> >>>>>>>>>>>> 6. https://issues.apache.org/jira/browse/KAFKA-4609
- might
> >>>>>>>>>>>> mean
> >>>>>>>>>>>>
> >>>>>>>>>>> that
> >>>>>>>
> >>>>>>>> we
> >>>>>>>>>>>
> >>>>>>>>>>>> always need to materialize the StateStore
for KTable-KTable
> >>>>>>>>>>>> joins.
> >>>>>>>>>>>>
> >>>>>>>>>>> If
> >>>>>>>
> >>>>>>>> that
> >>>>>>>>>>>
> >>>>>>>>>>>> is the case, then the KTable Join operators
will also need
> Serde
> >>>>>>>>>>>> information.
> >>>>>>>>>>>>
> >>>>>>>>>>> I'll update the KIP with the serdes.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>>> Eno
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Damian
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska
<
> >>>>>>>>>>>> eno.thereska@gmail.com>
> >>>>>>>>>>>>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We created "KIP-114: KTable materialization
and improved
> >>>>>>>>>>>>>
> >>>>>>>>>>>> semantics"
> >>>>>
> >>>>>> to
> >>>>>>>
> >>>>>>>> solidify the KTable semantics in Kafka Streams:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>
> >>>>>>>>>>>> 114%3A+KTable+materialization+and+improved+semantics
> >>>>>>>>>>>
> >>>>>>>>>>>> <
> >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>
> >>>>>>>>>>>> 114:+KTable+materialization+and+improved+semantics
> >>>>>>>>>>>
> >>>>>>>>>>>> Your feedback is appreciated.
> >>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message