kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mikael Högqvist <hoegqv...@gmail.com>
Subject Re: KafkaStreams KTable#through not creating changelog topic
Date Wed, 23 Nov 2016 06:39:05 GMT
Hi Eno,

1) Great :)

2) Yes, we are using the Interactive Queries to access the state stores. In
addition, we access the changelogs to subscribe to updates. For this reason
we need to know the changelog topic name.

Thanks,
Mikael

On Tue, Nov 22, 2016 at 8:43 PM Eno Thereska <eno.thereska@gmail.com> wrote:

> HI Mikael,
>
> 1) The JavaDoc looks incorrect, thanks for reporting. Matthias is looking
> into fixing it. I agree that it can be confusing to have topic names that
> are not what one would expect.
>
> 2) If your goal is to query/read from the state stores, you can use
> Interactive Queries to do that (you don't need to worry about the changelog
> topic name and such). Interactive Queries is a new feature in 0.10.1 (blog
> here:
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> >).
>
> Thanks
> Eno
>
>
> > On 22 Nov 2016, at 19:27, Mikael Högqvist <hoegqvist@gmail.com> wrote:
> >
> > Sorry for being unclear, i'll try again :)
> >
> > 1) The JavaDoc for through is not correct, it states that a changelog
> topic
> > will be created for the state store. That is, if I would call it with
> > through("topic", "a-store"), I would expect a kafka topic
> > "my-app-id-a-store-changelog" to be created.
> >
> > 2) Regarding the use case, the topology looks like this:
> >
> > .stream(...)
> > .aggregate(..., "store-1")
> > .mapValues(...)
> > .through(..., "store-2")
> >
> > Basically, I want to materialize both the result from the aggregate
> method
> > and the result from mapValues, which is materialized using .through().
> > Later, I will access both the tables (store-1 and store-2) to a) get the
> > current state of the aggregate, b) subscribe to future updates. This
> works
> > just fine. The only issue is that I assumed to have a changelog topic for
> > store-2 created automatically, which didnt happen.
> >
> > Since I want to access the changelog topic, it helps if the naming is
> > consistent. So either we enforce the same naming pattern as kafka when
> > calling .through() or alternatively the Kafka Streams API can provide a
> > method to materialize tables which creates a topic name according to the
> > naming pattern. E.g. .through() without the topic parameter.
> >
> > What do you think?
> >
> > Best,
> > Mikael
> >
> > On Tue, Nov 22, 2016 at 7:21 PM Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> I cannot completely follow what want to achieve.
> >>
> >> However, the JavaDoc for through() seems not to be correct to me. Using
> >> through() will not create an extra internal changelog topic with the
> >> described naming schema, because the topic specified in through() can be
> >> used for this (there is no point in duplicating the data).
> >>
> >> If you have a KTable and apply a mapValues(), this will not write data
> >> to any topic. The derived KTable is in-memory because you can easily
> >> recreate it from its base KTable.
> >>
> >> What is the missing part you want to get?
> >>
> >> Btw: the internally created changelog topics are only used for recovery
> >> in case of failure. Streams does not consumer from those topic during
> >> "normal operation".
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 11/22/16 1:59 AM, Mikael Högqvist wrote:
> >>> Hi,
> >>>
> >>> in the documentation for KTable#through, it is stated that a new
> >> changelog
> >>> topic will be created for the table. It also states that calling
> through
> >> is
> >>> equivalent to calling #to followed by KStreamBuilder#table.
> >>>
> >>>
> >>
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/streams/kstream/KTable.html#through(org.apache.kafka.common.serialization.Serde,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String,%20java.lang.String)
> >>>
> >>> In the docs for KStreamBuilder#table it is stated that no new changelog
> >>> topic will be created since the underlying topic acts as the changelog.
> >>> I've verified that this is the case.
> >>>
> >>> Is there another API method to materialize the results of a KTable
> >>> including a changelog, i.e. such that kafka streams creates the topic
> and
> >>> uses the naming schema for changelog topics? The use case I have in
> mind
> >> is
> >>> aggregate followed by mapValues.
> >>>
> >>> Best,
> >>> Mikael
> >>>
> >>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message