kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Noll <mich...@confluent.io>
Subject Re: KafkaStreams KTable#through not creating changelog topic
Date Wed, 23 Nov 2016 15:15:02 GMT
Mikael,

regarding your second question:

> 2) Regarding the use case, the topology looks like this:
>
> .stream(...)
> .aggregate(..., "store-1")
> .mapValues(...)
> .through(..., "store-2")

The last operator above would, without "..." ellipsis, be sth like
`KTable#through("through-topic", "store-2")`.  Here, "through-topic" is the
changelog topic for both the KTable and the state store "store-2".  So this
is the changelog topic name that you want to know.

- If you want the "through" topic to have a `-changelog` suffix, then you'd
need to add that yourself in the call to `through(...)`.

- If you wonder why `through()` doesn't add a `-changelog` suffix
automatically:  That's because `through()` -- like `to()` or `stream()`,
`table()` -- require you to explicitly provide a topic name, and of course
Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is only
added when Kafka creates internal changelog topics behind the scenes for
you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
because it refers to `-changelog`;  we'll fix that as mentioned above.

- Also, in case you want to do some shenanigans (like for some tooling
you're building around state stores/changelogs/interactive queries) such
detecting all state store changelogs by doing the equivalent of `ls
*-changelog`, then this will miss changelogs of KTables that are created by
`through()` and `to()` (unless you come up with a naming convention that
your tooling can assume to be in place, e.g. by always adding `-changelog`
to topic names when you call `through()`).

I hope this helps!
Michael




On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqvist@gmail.com>
wrote:

> Hi Eno,
>
> 1) Great :)
>
> 2) Yes, we are using the Interactive Queries to access the state stores. In
> addition, we access the changelogs to subscribe to updates. For this reason
> we need to know the changelog topic name.
>
> Thanks,
> Mikael
>
> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com>
> wrote:
>
> > HI Mikael,
> >
> > 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> > into fixing it. I agree that it can be confusing to have topic names that
> > are not what one would expect.
> >
> > 2) If your goal is to query/read from the state stores, you can use
> > Interactive Queries to do that (you don't need to worry about the
> changelog
> > topic name and such). Interactive Queries is a new feature in 0.10.1
> (blog
> > here:
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > <
> > https://www.confluent.io/blog/unifying-stream-processing-
> and-interactive-queries-in-apache-kafka/
> > >).
> >
> > Thanks
> > Eno
> >
> >
> > > On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com> wrote:
> > >
> > > Sorry for being unclear, i'll try again :)
> > >
> > > 1) The JavaDoc for through is not correct, it states that a changelog
> > topic
> > > will be created for the state store. That is, if I would call it with
> > > through("topic", "a-store"), I would expect a kafka topic
> > > "my-app-id-a-store-changelog" to be created.
> > >
> > > 2) Regarding the use case, the topology looks like this:
> > >
> > > .stream(...)
> > > .aggregate(..., "store-1")
> > > .mapValues(...)
> > > .through(..., "store-2")
> > >
> > > Basically, I want to materialize both the result from the aggregate
> > method
> > > and the result from mapValues, which is materialized using .through().
> > > Later, I will access both the tables (store-1 and store-2) to a) get
> the
> > > current state of the aggregate, b) subscribe to future updates. This
> > works
> > > just fine. The only issue is that I assumed to have a changelog topic
> for
> > > store-2 created automatically, which didnt happen.
> > >
> > > Since I want to access the changelog topic, it helps if the naming is
> > > consistent. So either we enforce the same naming pattern as kafka when
> > > calling .through() or alternatively the Kafka Streams API can provide a
> > > method to materialize tables which creates a topic name according to
> the
> > > naming pattern. E.g. .through() without the topic parameter.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Mikael
> > >
> > > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> I cannot completely follow what want to achieve.
> > >>
> > >> However, the JavaDoc for through() seems not to be correct to me.
> Using
> > >> through() will not create an extra internal changelog topic with the
> > >> described naming schema, because the topic specified in through() can
> be
> > >> used for this (there is no point in duplicating the data).
> > >>
> > >> If you have a KTable and apply a mapValues(), this will not write data
> > >> to any topic. The derived KTable is in-memory because you can easily
> > >> recreate it from its base KTable.
> > >>
> > >> What is the missing part you want to get?
> > >>
> > >> Btw: the internally created changelog topics are only used for
> recovery
> > >> in case of failure. Streams does not consumer from those topic during
> > >> "normal operation".
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > >>> Hi,
> > >>>
> > >>> in the documentation for KTable#through, it is stated that a new
> > >> changelog
> > >>> topic will be created for the table. It also states that calling
> > through
> > >> is
> > >>> equivalent to calling #to followed by KStreamBuilder#table.
> > >>>
> > >>>
> > >>
> > http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> streams/kstream/KTable.html#through(org.apache.kafka.
> common.serialization.Serde,%20org.apache.kafka.common.
> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > >>>
> > >>> In the docs for KStreamBuilder#table it is stated that no new
> changelog
> > >>> topic will be created since the underlying topic acts as the
> changelog.
> > >>> I've verified that this is the case.
> > >>>
> > >>> Is there another API method to materialize the results of a KTable
> > >>> including a changelog, i.e. such that kafka streams creates the topic
> > and
> > >>> uses the naming schema for changelog topics? The use case I have in
> > mind
> > >> is
> > >>> aggregate followed by mapValues.
> > >>>
> > >>> Best,
> > >>> Mikael
> > >>>
> > >>
> > >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message