kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gioacchino Vino <gioacchinov...@gmail.com>
Subject Kafka Streams - Java StreamException - Serdes issue
Date Tue, 09 Apr 2019 17:14:32 GMT
Hi experts,

I believe to understand there is the need to set the serde for the 
Double type after/in the map function for a re-partition task.

I can't figure out where to specified. I've already tried to find the 
answer on documentation and article but I failed.

The following code

KStream<String, Triplet<String, Double, String>> sum_data = ...

         KStream<String, String> aggr_stream = 
sum_data.selectKey((key,value) -> value.getValue0())
.mapValues((key,value) -> value.getValue1())
.reduce((v1,v2) -> v1 + v2 )
.map((key,value) -> new 

produces the a StreamException

Caused by: org.apache.kafka.streams.errors.StreamsException: A 
serializer (key: org.apache.kafka.common.serialization.StringSerializer 
/ value: org.apache.kafka.common.serialization.StringSerializer) is not 
compatible to the actual key or value type (key type: java.lang.String / 
value type: java.lang.Double). Change the default Serdes in StreamConfig 
or provide correct Serdes via method parameters.

Since the default Serdes are both (key and value ) String

Of course if I force the String type, for example using the following code

KStream<String, String> aggr_stream = sum_data.selectKey((key,value) -> 
.mapValues((key,value) -> value.getValue1()*.toString(*))
.reduce((v1,v2) -> *new Double( Double.parseDouble(v1) + 
Double.parseDouble(v2) ).toString()* )
.map((key,value) -> new 

all works, but of course it's not the best way to do it.

Someone could help me?

Thanks in advance,


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message