kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patrik Kleindl <pklei...@gmail.com>
Subject Re: How to set concrete names for state stores and internal topics backed by these
Date Fri, 06 Dec 2019 19:24:27 GMT
Hi Sachin

We are using a small helper method to keep this readable:

private <K, V, S extends StateStore> Materialized<K, V, S>
materializedWith(String name, Serde<K> keySerde, Serde<V> valueSerde)
{
    Materialized<K, V, S> materialized = Materialized.as(name);
    return materialized.withKeySerde(keySerde).withValueSerde(valueSerde);
}

So the Materialized.as just becomes a

materializedWith("storename", keySerde, valueSerde)

Hope that helps

Patrik


On Fri, 6 Dec 2019 at 18:32, John Roesler <vvcephei@apache.org> wrote:

> Hi Sachin,
>
> The way that Java infers generic arguments makes that case particularly
> obnoxious.
>
> By the way, the problem you're facing is specifically addressed by these
> relatively new features:
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> *
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping
>
> Since this behavior has been under development recently, I thought you
> might benefit from the context.
>
> To answer your question, what you have to do is explicitly mention the
> type arguments to "Materialized.as(name)" when you're using the
> withKeySerde, etc.
>
> It will look something like this:
>
> Materialized
>   .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
>   .withKeySerde(new Serde<KeyType>...)
>   .withValueSerde(new Serde<ValueType>...));
>
> I can explain exactly why this is necessary if you want, but the short
> answer is that the Java type system only makes a rudimentary effort to
> infer types.
>
> FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can
> find a way to fix it, if we ever change the Materialized builder interface.
>
> Hope this helps,
> -John
>
> On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> > Hi,
> > In my application I have names of internal topics like this:
> >
> > ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> > ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> > ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> > ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> >
> > Is it possible to set concrete names for these instead of say **
> > KSTREAM-JOINOTHER-0000000059-store**
> >
> > This way I can identify at what code in my DSL is responsible for data
> > inside them.
> >
> > So far I have set names for:
> > Grouped.with
> > Materialized.as
> > Joined.with
> >
> > This has helped me get concrete names at many places however still at
> some
> > places I see arbitrary names.
> >
> > Also note that somehow this code works
> > Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> >
> > But not:
> > Materialized.as("d-l-i-store").withKeySerde(new
> > JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> >
> > The error I get is:
> > Description Resource Path Location Type
> > The method withKeySerde(Serde<Object>) in the type
> > Materialized<Object,Object,StateStore> is not applicable for the
> arguments
> > (JSONSerde<K>)
> >
> > I have my class
> >
> > class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> > Deserializer<T>, Serde<T> {
> > ......
> > }
> >
> > This is pretty much same as from kafka streams typed example.
> >
> > Thanks
> > Sachin
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message