kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikael Högqvist <hoegqv...@gmail.com>
Subject Re: KafkaStreams KTable#through not creating changelog topic
Date Fri, 25 Nov 2016 10:15:14 GMT
Thanks, based on this we will re-evaluate the use of internal topics. The
main motivation for using the internal changelog topics was to avoid
duplication of data and have an easy way to access the update stream of any
state store.

Best,
Mikael

On Fri, Nov 25, 2016 at 9:52 AM Michael Noll <michael@confluent.io> wrote:

> Mikael,
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
>
> Yes, that (i.e. you are running with auto-topic creation enabled) was what
> I implicitly understood.  As covered in [1] we strongly recommend to
> manually pre-create/manage user topics though.  User topics include the
> source topics that you are reading from (cf. `stream()`, `table()`) but
> also include the topics you use in `through()` and `to()`.
>
>
> > Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
>
> There is already work being done in the Admin API (KIP-4), and part of this
> functionality was released in the latest Kafka versions.  You can use this
> to programmatically create topics, for example.  Note though that the work
> on KIP-4 is not fully completed yet.
>
> -Michael
>
>
>
>
> [1]
>
> http://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application
>
>
>
> On Thu, Nov 24, 2016 at 3:55 PM, Mikael Högqvist <hoegqvist@gmail.com>
> wrote:
>
> > Sure, I guess the topic is auto-created the first time I start the
> topology
> > and the second time its there already. It could be possible to create
> > topics up front for us, or even use an admin call from inside the code.
> >
> > That said, as a user, I think it would be great with a function in the
> > Kafka Streams DSL that would allow me to materialize a KTable without
> > pre-creating the topic. Today there is .through(topic, store) and
> > .to(topic), maybe it would be possible to have something like
> > .materialize(store) which takes care of topic creation? Would adding
> > something like this require a KIP?
> >
> > Best,
> > Mikael
> >
> > On Thu, Nov 24, 2016 at 1:44 PM Damian Guy <damian.guy@gmail.com> wrote:
> >
> > Mikeal,
> >
> > When you use `through(..)` topics are not created by KafkaStreams. You
> need
> > to create them yourself before you run the application.
> >
> > Thanks,
> > Damian
> >
> > On Thu, 24 Nov 2016 at 11:27 Mikael Högqvist <hoegqvist@gmail.com>
> wrote:
> >
> > > Yes, the naming is not an issue.
> > >
> > > I've tested this with the topology described earlier. Every time I
> start
> > > the topology with a call to .through() that references a topic that
> does
> > > not exist, I get an exception from the UncaughtExceptionHandler:
> > >
> > > Uncaught exception org.apache.kafka.streams.errors.StreamsException:
> > Topic
> > > not found during partition assignment: words-count-changelog
> > >
> > > This happens when .through("words-count-changelog", "count") is part of
> > the
> > > topology. The topology is also not forwarding anything to that
> > topic/store.
> > > After restarting the application it works fine.
> > >
> > > Are the changelog topics created via, for example, .aggregate()
> different
> > > to topics auto created via .through()?
> > >
> > > Thanks,
> > > Mikael
> > >
> > > On Wed, Nov 23, 2016 at 7:57 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > > > > 1) Create a state store AND the changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > >
> > > > I understand what you are saying, but you can get this done right
> now,
> > > > too. If you use through(...) you will get the store. And you can just
> > > > specify the topic name as "applicationId-storeName-changelog" to
> > follow
> > > > the naming convention Streams used internally. What is the problem
> > using
> > > > this approach (besides that you have to provide the topic name which
> > > > seems not to be a big burden to me?)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 11/23/16 8:59 AM, Mikael Högqvist wrote:
> > > > > Hi Michael,
> > > > >
> > > > > thanks for the extensive explanation, and yes it definitely helps
> > with
> > > my
> > > > > understanding of through(). :)
> > > > >
> > > > > You guessed correctly that I'm doing some "shenanings" where I'm
> > trying
> > > > to
> > > > > derive the changelog of a state store from the state store name.
> This
> > > > works
> > > > > perfectly fine with with a naming convention for the topics and by
> > > > creating
> > > > > them in Kafka upfront.
> > > > >
> > > > > My point is that it would help me (and maybe others), if the API
of
> > > > KTable
> > > > > was extended to have a new method that does two things that is not
> > part
> > > > of
> > > > > the implementation of .through(). 1) Create a state store AND the
> > > > changelog
> > > > > topic 2) follow the Kafka Streams naming convention for changelog
> > > topics.
> > > > > Basically, I want to have a method that does what .through() is
> > > supposed
> > > > to
> > > > > do according to the documentation, but without the "topic"
> parameter.
> > > > >
> > > > > What do you think, would it be possible to extend the API with a
> > method
> > > > > like that?
> > > > >
> > > > > Thanks,
> > > > > Mikael
> > > > >
> > > > > On Wed, Nov 23, 2016 at 4:16 PM Michael Noll <michael@confluent.io
> >
> > > > wrote:
> > > > >
> > > > >> Mikael,
> > > > >>
> > > > >> regarding your second question:
> > > > >>
> > > > >>> 2) Regarding the use case, the topology looks like this:
> > > > >>>
> > > > >>> .stream(...)
> > > > >>> .aggregate(..., "store-1")
> > > > >>> .mapValues(...)
> > > > >>> .through(..., "store-2")
> > > > >>
> > > > >> The last operator above would, without "..." ellipsis, be sth
like
> > > > >> `KTable#through("through-topic", "store-2")`.  Here,
> > "through-topic"
> > > is
> > > > the
> > > > >> changelog topic for both the KTable and the state store "store-2".
> > So
> > > > this
> > > > >> is the changelog topic name that you want to know.
> > > > >>
> > > > >> - If you want the "through" topic to have a `-changelog` suffix,
> > then
> > > > you'd
> > > > >> need to add that yourself in the call to `through(...)`.
> > > > >>
> > > > >> - If you wonder why `through()` doesn't add a `-changelog` suffix
> > > > >> automatically:  That's because `through()` -- like `to()` or
> > > `stream()`,
> > > > >> `table()` -- require you to explicitly provide a topic name,
and
> of
> > > > course
> > > > >> Kafka will use exactly this name.  (FWIW, the `-changelog` suffix
> is
> > > > only
> > > > >> added when Kafka creates internal changelog topics behind the
> scenes
> > > for
> > > > >> you.)   Unfortunately, the javadocs of `KTable#through()` is
> > incorrect
> > > > >> because it refers to `-changelog`;  we'll fix that as mentioned
> > above.
> > > > >>
> > > > >> - Also, in case you want to do some shenanigans (like for some
> > tooling
> > > > >> you're building around state stores/changelogs/interactive
> queries)
> > > such
> > > > >> detecting all state store changelogs by doing the equivalent
of
> `ls
> > > > >> *-changelog`, then this will miss changelogs of KTables that
are
> > > > created by
> > > > >> `through()` and `to()` (unless you come up with a naming
> convention
> > > that
> > > > >> your tooling can assume to be in place, e.g. by always adding
> > > > `-changelog`
> > > > >> to topic names when you call `through()`).
> > > > >>
> > > > >> I hope this helps!
> > > > >> Michael
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Nov 23, 2016 at 7:39 AM, Mikael Högqvist <
> > hoegqvist@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Eno,
> > > > >>>
> > > > >>> 1) Great :)
> > > > >>>
> > > > >>> 2) Yes, we are using the Interactive Queries to access the
state
> > > > stores.
> > > > >> In
> > > > >>> addition, we access the changelogs to subscribe to updates.
For
> > this
> > > > >> reason
> > > > >>> we need to know the changelog topic name.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Mikael
> > > > >>>
> > > > >>> On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> HI Mikael,
> > > > >>>>
> > > > >>>> 1) The JavaDoc looks incorrect, thanks for reporting.
Matthias
> is
> > > > >> looking
> > > > >>>> into fixing it. I agree that it can be confusing to have
topic
> > names
> > > > >> that
> > > > >>>> are not what one would expect.
> > > > >>>>
> > > > >>>> 2) If your goal is to query/read from the state stores,
you can
> > use
> > > > >>>> Interactive Queries to do that (you don't need to worry
about
> the
> > > > >>> changelog
> > > > >>>> topic name and such). Interactive Queries is a new feature
in
> > 0.10.1
> > > > >>> (blog
> > > > >>>> here:
> > > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > > >>> and-interactive-queries-in-apache-kafka/
> > > > >>>> <
> > > > >>>> https://www.confluent.io/blog/unifying-stream-processing-
> > > > >>> and-interactive-queries-in-apache-kafka/
> > > > >>>>> ).
> > > > >>>>
> > > > >>>> Thanks
> > > > >>>> Eno
> > > > >>>>
> > > > >>>>
> > > > >>>>> On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com
> >
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>> Sorry for being unclear, i'll try again :)
> > > > >>>>>
> > > > >>>>> 1) The JavaDoc for through is not correct, it states
that a
> > > changelog
> > > > >>>> topic
> > > > >>>>> will be created for the state store. That is, if
I would call
> it
> > > with
> > > > >>>>> through("topic", "a-store"), I would expect a kafka
topic
> > > > >>>>> "my-app-id-a-store-changelog" to be created.
> > > > >>>>>
> > > > >>>>> 2) Regarding the use case, the topology looks like
this:
> > > > >>>>>
> > > > >>>>> .stream(...)
> > > > >>>>> .aggregate(..., "store-1")
> > > > >>>>> .mapValues(...)
> > > > >>>>> .through(..., "store-2")
> > > > >>>>>
> > > > >>>>> Basically, I want to materialize both the result
from the
> > aggregate
> > > > >>>> method
> > > > >>>>> and the result from mapValues, which is materialized
using
> > > > >> .through().
> > > > >>>>> Later, I will access both the tables (store-1 and
store-2) to
> a)
> > > get
> > > > >>> the
> > > > >>>>> current state of the aggregate, b) subscribe to future
updates.
> > > This
> > > > >>>> works
> > > > >>>>> just fine. The only issue is that I assumed to have
a changelog
> > > topic
> > > > >>> for
> > > > >>>>> store-2 created automatically, which didnt happen.
> > > > >>>>>
> > > > >>>>> Since I want to access the changelog topic, it helps
if the
> > naming
> > > is
> > > > >>>>> consistent. So either we enforce the same naming
pattern as
> kafka
> > > > >> when
> > > > >>>>> calling .through() or alternatively the Kafka Streams
API can
> > > > >> provide a
> > > > >>>>> method to materialize tables which creates a topic
name
> according
> > > to
> > > > >>> the
> > > > >>>>> naming pattern. E.g. .through() without the topic
parameter.
> > > > >>>>>
> > > > >>>>> What do you think?
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Mikael
> > > > >>>>>
> > > > >>>>> On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <
> > > > >> matthias@confluent.io
> > > > >>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> I cannot completely follow what want to achieve.
> > > > >>>>>>
> > > > >>>>>> However, the JavaDoc for through() seems not
to be correct to
> > me.
> > > > >>> Using
> > > > >>>>>> through() will not create an extra internal changelog
topic
> with
> > > the
> > > > >>>>>> described naming schema, because the topic specified
in
> > through()
> > > > >> can
> > > > >>> be
> > > > >>>>>> used for this (there is no point in duplicating
the data).
> > > > >>>>>>
> > > > >>>>>> If you have a KTable and apply a mapValues(),
this will not
> > write
> > > > >> data
> > > > >>>>>> to any topic. The derived KTable is in-memory
because you can
> > > easily
> > > > >>>>>> recreate it from its base KTable.
> > > > >>>>>>
> > > > >>>>>> What is the missing part you want to get?
> > > > >>>>>>
> > > > >>>>>> Btw: the internally created changelog topics
are only used for
> > > > >>> recovery
> > > > >>>>>> in case of failure. Streams does not consumer
from those topic
> > > > >> during
> > > > >>>>>> "normal operation".
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> > > > >>>>>>> Hi,
> > > > >>>>>>>
> > > > >>>>>>> in the documentation for KTable#through,
it is stated that a
> > new
> > > > >>>>>> changelog
> > > > >>>>>>> topic will be created for the table. It also
states that
> > calling
> > > > >>>> through
> > > > >>>>>> is
> > > > >>>>>>> equivalent to calling #to followed by KStreamBuilder#table.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/
> > > > >>> streams/kstream/KTable.html#through(org.apache.kafka.
> > > > >>> common.serialization.Serde,%20org.apache.kafka.common.
> > > > >>> serialization.Serde,%20java.lang.String,%20java.lang.String)
> > > > >>>>>>>
> > > > >>>>>>> In the docs for KStreamBuilder#table it is
stated that no new
> > > > >>> changelog
> > > > >>>>>>> topic will be created since the underlying
topic acts as the
> > > > >>> changelog.
> > > > >>>>>>> I've verified that this is the case.
> > > > >>>>>>>
> > > > >>>>>>> Is there another API method to materialize
the results of a
> > > KTable
> > > > >>>>>>> including a changelog, i.e. such that kafka
streams creates
> the
> > > > >> topic
> > > > >>>> and
> > > > >>>>>>> uses the naming schema for changelog topics?
The use case I
> > have
> > > in
> > > > >>>> mind
> > > > >>>>>> is
> > > > >>>>>>> aggregate followed by mapValues.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Mikael
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message