kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "John Roesler" <vvcep...@apache.org>
Subject Re: How to set concrete names for state stores and internal topics backed by these
Date Fri, 06 Dec 2019 17:32:01 GMT
Hi Sachin,

The way that Java infers generic arguments makes that case particularly obnoxious.

By the way, the problem you're facing is specifically addressed by these relatively new features:
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
* https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping

Since this behavior has been under development recently, I thought you might benefit from
the context.

To answer your question, what you have to do is explicitly mention the type arguments to "Materialized.as(name)"
when you're using the withKeySerde, etc. 

It will look something like this:

Materialized
  .<KeyType, ValueType KeyValueStore<Bytes, byte[]>>as("store-name")
  .withKeySerde(new Serde<KeyType>...)
  .withValueSerde(new Serde<ValueType>...));

I can explain exactly why this is necessary if you want, but the short answer is that the
Java type system only makes a rudimentary effort to infer types.

FWIW, this "paper cut" makes me irrationally angry, and I'm hoping we can find a way to fix
it, if we ever change the Materialized builder interface.

Hope this helps,
-John

On Fri, Dec 6, 2019, at 11:15, Sachin Mittal wrote:
> Hi,
> In my application I have names of internal topics like this:
> 
> ss-session-application-KSTREAM-JOINOTHER-0000000059-store-changelog-0
> ss-session-application-KSTREAM-JOINTHIS-0000000049-store-changelog-0
> ss-session-application-KSTREAM-OUTEROTHER-0000000050-store-changelog-0
> ss-session-application-KTABLE-MERGE-STATE-STORE-0000000061-changelog-0
> 
> Is it possible to set concrete names for these instead of say **
> KSTREAM-JOINOTHER-0000000059-store**
> 
> This way I can identify at what code in my DSL is responsible for data
> inside them.
> 
> So far I have set names for:
> Grouped.with
> Materialized.as
> Joined.with
> 
> This has helped me get concrete names at many places however still at some
> places I see arbitrary names.
> 
> Also note that somehow this code works
> Materialized.with(new JSONSerde<K>(), new TupleJSONSerde<Pair<L, V>>())
> 
> But not:
> Materialized.as("d-l-i-store").withKeySerde(new
> JSONSerde<K>()).withValueSerde(new TupleJSONSerde<Pair<L, V>>())
> 
> The error I get is:
> Description Resource Path Location Type
> The method withKeySerde(Serde<Object>) in the type
> Materialized<Object,Object,StateStore> is not applicable for the arguments
> (JSONSerde<K>)
> 
> I have my class
> 
> class JSONSerde<T extends JSONSerdeCompatible> implements Serializer<T>,
> Deserializer<T>, Serde<T> {
> ......
> }
> 
> This is pretty much same as from kafka streams typed example.
> 
> Thanks
> Sachin
>

Mime
View raw message