kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikael Högqvist <hoegqv...@gmail.com>
Subject Re: KafkaStreams KTable#through not creating changelog topic
Date Wed, 23 Nov 2016 16:59:56 GMT
Hi Michael,

thanks for the extensive explanation, and yes it definitely helps with my
understanding of through(). :)

You guessed correctly that I'm doing some "shenanings" where I'm trying to
derive the changelog of a state store from the state store name. This works
perfectly fine with with a naming convention for the topics and by creating
them in Kafka upfront.

My point is that it would help me (and maybe others), if the API of KTable
was extended to have a new method that does two things that is not part of
the implementation of .through(). 1) Create a state store AND the changelog
topic 2) follow the Kafka Streams naming convention for changelog topics.
Basically, I want to have a method that does what .through() is supposed to
do according to the documentation, but without the "topic" parameter.

What do you think, would it be possible to extend the API with a method
like that?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <michael@confluent.io> wrote:

> Mikael,
>
> regarding your second question:
>
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
>
> The last operator above would, without "..." ellipsis, be sth like
> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
> changelog topic for both the KTable and the state store "store-2".  So this
> is the changelog topic name that you want to know.
>
> - If you want the "through" topic to have a `-changelog` suffix, then you'd
> need to add that yourself in the call to `through(...)`.
>
> - If you wonder why `through()` doesn't add a `-changelog` suffix
> automatically:  That's because `through()` -- like `to()` or `stream()`,
> `table()` -- require you to explicitly provide a topic name, and of course
> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
> added when Kafka creates internal changelog topics behind the scenes for
> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> because it refers to `-changelog`;  we'll fix that as mentioned above.
>
> - Also, in case you want to do some shenanigans (like for some tooling
> you're building around state stores/changelogs/interactive queries) such
> detecting all state store changelogs by doing the equivalent of `ls
> *-changelog`, then this will miss changelogs of KTables that are created by
> `through()` and `to()` (unless you come up with a naming convention that
> your tooling can assume to be in place, e.g. by always adding `-changelog`
> to topic names when you call `through()`).
>
> I hope this helps!
> Michael
>
>
>
>
> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqvist@gmail.com>
> wrote:
>
> > Hi Eno,
> >
> > 1) Great :)
> >
> > 2) Yes, we are using the Interactive Queries to access the state stores.
> In
> > addition, we access the changelogs to subscribe to updates. For this
> reason
> > we need to know the changelog topic name.
> >
> > Thanks,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> >
> > > HI Mikael,
> > >
> > > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> looking
> > > into fixing it. I agree that it can be confusing to have topic names
> that
> > > are not what one would expect.
> > >
> > > 2) If your goal is to query/read from the state stores, you can use
> > > Interactive Queries to do that (you don't need to worry about the
> > changelog
> > > topic name and such). Interactive Queries is a new feature in 0.10.1
> > (blog
> > > here:
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > <
> > > https://www.confluent.io/blog/unifying-stream-processing-
> > and-interactive-queries-in-apache-kafka/
> > > >).
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com>
> wrote:
> > > >
> > > > Sorry for being unclear, i'll try again :)
> > > >
> > > > 1) The JavaDoc for through is not correct, it states that a changelog
> > > topic
> > > > will be created for the state store. That is, if I would call it with
> > > > through("topic", "a-store"), I would expect a kafka topic
> > > > "my-app-id-a-store-changelog" to be created.
> > > >
> > > > 2) Regarding the use case, the topology looks like this:
> > > >
> > > > .stream(...)
> > > > .aggregate(..., "store-1")
> > > > .mapValues(...)
> > > > .through(..., "store-2")
> > > >
> > > > Basically, I want to materialize both the result from the aggregate
> > > method
> > > > and the result from mapValues, which is materialized using
> .through().
> > > > Later, I will access both the tables (store-1 and store-2) to a) get
> > the
> > > > current state of the aggregate, b) subscribe to future updates. This
> > > works
> > > > just fine. The only issue is that I assumed to have a changelog topic
> > for
> > > > store-2 created automatically, which didnt happen.
> > > >
> > > > Since I want to access the changelog topic, it helps if the naming is
> > > > consistent. So either we enforce the same naming pattern as kafka
> when
> > > > calling .through() or alternatively the Kafka Streams API can
> provide a
> > > > method to materialize tables which creates a topic name according to
> > the
> > > > naming pattern. E.g. .through() without the topic parameter.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Mikael
> > > >
> > > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> I cannot completely follow what want to achieve.
> > > >>
> > > >> However, the JavaDoc for through() seems not to be correct to me.
> > Using
> > > >> through() will not create an extra internal changelog topic with the
> > > >> described naming schema, because the topic specified in through()
> can
> > be
> > > >> used for this (there is no point in duplicating the data).
> > > >>
> > > >> If you have a KTable and apply a mapValues(), this will not write
> data
> > > >> to any topic. The derived KTable is in-memory because you can easily
> > > >> recreate it from its base KTable.
> > > >>
> > > >> What is the missing part you want to get?
> > > >>
> > > >> Btw: the internally created changelog topics are only used for
> > recovery
> > > >> in case of failure. Streams does not consumer from those topic
> during
> > > >> "normal operation".
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > >>> Hi,
> > > >>>
> > > >>> in the documentation for KTable#through, it is stated that a new
> > > >> changelog
> > > >>> topic will be created for the table. It also states that calling
> > > through
> > > >> is
> > > >>> equivalent to calling #to followed by KStreamBuilder#table.
> > > >>>
> > > >>>
> > > >>
> > > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > streams/kstream/KTable.html#through(org.apache.kafka.
> > common.serialization.Serde,%20org.apache.kafka.common.
> > serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > >>>
> > > >>> In the docs for KStreamBuilder#table it is stated that no new
> > changelog
> > > >>> topic will be created since the underlying topic acts as the
> > changelog.
> > > >>> I've verified that this is the case.
> > > >>>
> > > >>> Is there another API method to materialize the results of a KTable
> > > >>> including a changelog, i.e. such that kafka streams creates the
> topic
> > > and
> > > >>> uses the naming schema for changelog topics? The use case I have
in
> > > mind
> > > >> is
> > > >>> aggregate followed by mapValues.
> > > >>>
> > > >>> Best,
> > > >>> Mikael
> > > >>>
> > > >>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message