kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: How to set concrete names for state stores and internal topics backed by these
Date Sat, 07 Dec 2019 09:20:01 GMT
Hi John,
This was very helpful. However I am still confused about when to set the
names for Materialized and Grouped.
I am basically setting the names because to have definite names of state
stores and internal topics identifiable for debugging purpose.

So when we set a name, do we also need to set serde for key/value type?
If not then what defaults are used by them?

I'll just explain by quick example:
My original code was:
table = stream.map((k, v) -> ...).groupByKey().reduce((av, nv) -> nv)

In order to set some names to the intermediate stores/topics I changed the
code as:
table = stream.map((k, v) -> ...).groupByKey(Grouped.with("group",
Serde<K>, Serde<V>)).reduce((av, nv) -> nv, Materialized.as("store"))

So I wanted to know once I create a named Materialzed do I need to set its
key/value serde too?
so is this the better code
table = stream
  .map((k, v) -> ...)
  .groupByKey(Grouped.with("group", Serde<K>, Serde<V>))
  .reduce((av, nv) -> nv, Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>as("store-name").withKeySerde(Serde<K>).withValueSerde(Serde<V>)))

Note that I have custom class for Key and Value.

Thanks
Sachin



On Fri, Dec 6, 2019 at 11:02 PM John Roesler <vvcephei@apache.org> wrote:

> Hi Sachin,
>
> The way that Java infers generic arguments makes that case particularly
> obnoxious.
>
> By the way, the problem you're facing is specifically addressed by these
> relatively new features:
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
>
> Since this behavior has been under development recently, I thought you
> might benefit from the context.
>
> To answer your question, what you have to do is explicitly mention the
> type arguments to "Materialized.as(name)" when you're using the
> withKeySerde, etc.
>
> It will look something like this:
>
> Materialized
>   .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
>   .withKeySerde(new Serde<KeyType>...)
>   .withValueSerde(new Serde<ValueType>...));
>
> I can explain exactly why this is necessary if you want, but the short
> answer is that the Java type system only makes a rudimentary effort to
> infer types.
>
> FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> find a way to fix it, if we ever change the Materialized builder interface.
>
> Hope this helps,
> -John
>
> On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > Hi,
> > In my application I have names of internal topics like this:
> >
> > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> >
> > Is it possible to set concrete names for these instead of say **
> > KSTREAM-JOINOTHER-0000000059-store**
> >
> > This way I can identify at what code in my DSL is responsible for data
> > inside them.
> >
> > So far I have set names for:
> > Grouped.with
> > Materialized.as
> > Joined.with
> >
> > This has helped me get concrete names at many places however still at
> some
> > places I see arbitrary names.
> >
> > Also note that somehow this code works
> > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> >
> > But not:
> > Materialized.as("d-l-i-store").withKeySerde(new
> > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> >
> > The error I get is:
> > Description Resource Path Location Type
> > The method withKeySerde(Serde<Object>) in the type
> > Materialized<Object,Object,StateStore> is not applicable for the
> arguments
> > (JSONSerde<K>)
> >
> > I have my class
> >
> > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> > Deserializer<T>, Serde<T> {
> > ......
> > }
> >
> > This is pretty much same as from kafka streams typed example.
> >
> > Thanks
> > Sachin
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message