kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikael Högqvist <hoegqv...@gmail.com>
Subject Re: KafkaStreams KTable#through not creating changelog topic
Date Thu, 24 Nov 2016 11:27:27 GMT
Yes, the naming is not an issue.

I've tested this with the topology described earlier. Every time I start
the topology with a call to .through() that references a topic that does
not exist, I get an exception from the UncaughtExceptionHandler:

Uncaught exception org.apache.kafka.streams.errors.StreamsException: Topic
not found during partition assignment: words-count-changelog

This happens when .through("words-count-changelog", "count") is part of the
topology. The topology is also not forwarding anything to that topic/store.
After restarting the application it works fine.

Are the changelog topics created via, for example, .aggregate() different
to topics auto created via .through()?

Thanks,
Mikael

On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matthias@confluent.io>
wrote:

> > 1) Create a state store AND the changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
>
> I understand what you are saying, but you can get this done right now,
> too. If you use through(...) you will get the store. And you can just
> specify the topic name as "applicationId-storeName-changelog" to follow
> the naming convention Streams used internally. What is the problem using
> this approach (besides that you have to provide the topic name which
> seems not to be a big burden to me?)
>
>
> -Matthias
>
>
> On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > Hi Michael,
> >
> > thanks for the extensive explanation, and yes it definitely helps with my
> > understanding of through(). :)
> >
> > You guessed correctly that I'm doing some "shenanings" where I'm trying
> to
> > derive the changelog of a state store from the state store name. This
> works
> > perfectly fine with with a naming convention for the topics and by
> creating
> > them in Kafka upfront.
> >
> > My point is that it would help me (and maybe others), if the API of
> KTable
> > was extended to have a new method that does two things that is not part
> of
> > the implementation of .through(). 1) Create a state store AND the
> changelog
> > topic 2) follow the Kafka Streams naming convention for changelog topics.
> > Basically, I want to have a method that does what .through() is supposed
> to
> > do according to the documentation, but without the "topic" parameter.
> >
> > What do you think, would it be possible to extend the API with a method
> > like that?
> >
> > Thanks,
> > Mikael
> >
> > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <michael@confluent.io>
> wrote:
> >
> >> Mikael,
> >>
> >> regarding your second question:
> >>
> >>> 2) Regarding the use case, the topology looks like this:
> >>>
> >>> .stream(...)
> >>> .aggregate(..., "store-1")
> >>> .mapValues(...)
> >>> .through(..., "store-2")
> >>
> >> The last operator above would, without "..." ellipsis, be sth like
> >> `KTable#through("through-topic", "store-2")`.  Here, "through-topic" is
> the
> >> changelog topic for both the KTable and the state store "store-2".  So
> this
> >> is the changelog topic name that you want to know.
> >>
> >> - If you want the "through" topic to have a `-changelog` suffix, then
> you'd
> >> need to add that yourself in the call to `through(...)`.
> >>
> >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> >> automatically:  That's because `through()` -- like `to()` or `stream()`,
> >> `table()` -- require you to explicitly provide a topic name, and of
> course
> >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix is
> only
> >> added when Kafka creates internal changelog topics behind the scenes for
> >> you.)   Unfortunately, the javadocs of `KTable#through()` is incorrect
> >> because it refers to `-changelog`;  we'll fix that as mentioned above.
> >>
> >> - Also, in case you want to do some shenanigans (like for some tooling
> >> you're building around state stores/changelogs/interactive queries) such
> >> detecting all state store changelogs by doing the equivalent of `ls
> >> *-changelog`, then this will miss changelogs of KTables that are
> created by
> >> `through()` and `to()` (unless you come up with a naming convention that
> >> your tooling can assume to be in place, e.g. by always adding
> `-changelog`
> >> to topic names when you call `through()`).
> >>
> >> I hope this helps!
> >> Michael
> >>
> >>
> >>
> >>
> >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <hoegqvist@gmail.com>
> >> wrote:
> >>
> >>> Hi Eno,
> >>>
> >>> 1) Great :)
> >>>
> >>> 2) Yes, we are using the Interactive Queries to access the state
> stores.
> >> In
> >>> addition, we access the changelogs to subscribe to updates. For this
> >> reason
> >>> we need to know the changelog topic name.
> >>>
> >>> Thanks,
> >>> Mikael
> >>>
> >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com>
> >>> wrote:
> >>>
> >>>> HI Mikael,
> >>>>
> >>>> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is
> >> looking
> >>>> into fixing it. I agree that it can be confusing to have topic names
> >> that
> >>>> are not what one would expect.
> >>>>
> >>>> 2) If your goal is to query/read from the state stores, you can use
> >>>> Interactive Queries to do that (you don't need to worry about the
> >>> changelog
> >>>> topic name and such). Interactive Queries is a new feature in 0.10.1
> >>> (blog
> >>>> here:
> >>>> https://www.confluent.io/blog/unifying-stream-processing-
> >>> and-interactive-queries-in-apache-kafka/
> >>>> <
> >>>> https://www.confluent.io/blog/unifying-stream-processing-
> >>> and-interactive-queries-in-apache-kafka/
> >>>>> ).
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>
> >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com>
> >> wrote:
> >>>>>
> >>>>> Sorry for being unclear, i'll try again :)
> >>>>>
> >>>>> 1) The JavaDoc for through is not correct, it states that a changelog
> >>>> topic
> >>>>> will be created for the state store. That is, if I would call it
with
> >>>>> through("topic", "a-store"), I would expect a kafka topic
> >>>>> "my-app-id-a-store-changelog" to be created.
> >>>>>
> >>>>> 2) Regarding the use case, the topology looks like this:
> >>>>>
> >>>>> .stream(...)
> >>>>> .aggregate(..., "store-1")
> >>>>> .mapValues(...)
> >>>>> .through(..., "store-2")
> >>>>>
> >>>>> Basically, I want to materialize both the result from the aggregate
> >>>> method
> >>>>> and the result from mapValues, which is materialized using
> >> .through().
> >>>>> Later, I will access both the tables (store-1 and store-2) to a)
get
> >>> the
> >>>>> current state of the aggregate, b) subscribe to future updates.
This
> >>>> works
> >>>>> just fine. The only issue is that I assumed to have a changelog
topic
> >>> for
> >>>>> store-2 created automatically, which didnt happen.
> >>>>>
> >>>>> Since I want to access the changelog topic, it helps if the naming
is
> >>>>> consistent. So either we enforce the same naming pattern as kafka
> >> when
> >>>>> calling .through() or alternatively the Kafka Streams API can
> >> provide a
> >>>>> method to materialize tables which creates a topic name according
to
> >>> the
> >>>>> naming pattern. E.g. .through() without the topic parameter.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Mikael
> >>>>>
> >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> >> matthias@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> I cannot completely follow what want to achieve.
> >>>>>>
> >>>>>> However, the JavaDoc for through() seems not to be correct to
me.
> >>> Using
> >>>>>> through() will not create an extra internal changelog topic
with the
> >>>>>> described naming schema, because the topic specified in through()
> >> can
> >>> be
> >>>>>> used for this (there is no point in duplicating the data).
> >>>>>>
> >>>>>> If you have a KTable and apply a mapValues(), this will not
write
> >> data
> >>>>>> to any topic. The derived KTable is in-memory because you can
easily
> >>>>>> recreate it from its base KTable.
> >>>>>>
> >>>>>> What is the missing part you want to get?
> >>>>>>
> >>>>>> Btw: the internally created changelog topics are only used for
> >>> recovery
> >>>>>> in case of failure. Streams does not consumer from those topic
> >> during
> >>>>>> "normal operation".
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> in the documentation for KTable#through, it is stated that
a new
> >>>>>> changelog
> >>>>>>> topic will be created for the table. It also states that
calling
> >>>> through
> >>>>>> is
> >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> >>> streams/kstream/KTable.html#through(org.apache.kafka.
> >>> common.serialization.Serde,%20org.apache.kafka.common.
> >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> >>>>>>>
> >>>>>>> In the docs for KStreamBuilder#table it is stated that no
new
> >>> changelog
> >>>>>>> topic will be created since the underlying topic acts as
the
> >>> changelog.
> >>>>>>> I've verified that this is the case.
> >>>>>>>
> >>>>>>> Is there another API method to materialize the results of
a KTable
> >>>>>>> including a changelog, i.e. such that kafka streams creates
the
> >> topic
> >>>> and
> >>>>>>> uses the naming schema for changelog topics? The use case
I have in
> >>>> mind
> >>>>>> is
> >>>>>>> aggregate followed by mapValues.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Mikael
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message