kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gioacchino Vino <gioacchinov...@gmail.com>
Subject Re: Kafka Streams - Java StreamException - Serdes issue
Date Wed, 10 Apr 2019 09:22:55 GMT
Hi Bill,


Thanks so much for your help but unfortunately i did not work, I had the 
same error trace.

I found where the issue was: groupByKey, as described here 
<https://docs.confluent.io/current/streams/developer-guide/datatypes.html#streams-developer-guide-serdes>

So adding the *Grouped.with(Serdes.String(), Serdes.Double())***within 
the groupbykey all work.


sum_data.selectKey((key,value) -> value.getValue0())

.mapValues((key,value) -> value.getValue1()) 
.groupByKey(*Grouped.with(Serdes.String(), Serdes.Double())*) 
.windowedBy(TimeWindows.of(Duration.ofSeconds(10))) .reduce((v1,v2) -> 
v1 + v2 ) .toStream() .map((key,value) -> new 
KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));


Thanks,

Gioacchino


On 09/04/2019 20:44, Bill Bejeck wrote:
> Hi Gioacchino,
>
> If I'm understanding your topology correctly it looks like you are doing a
> reduce operation where the result is a double.
>
> For stateful operations, Kafka Streams uses persistent state stores for
> keeping track of the update stream.  When using the
> KGroupedStream#reduce method,
> if you don't provide a Materialized instance specifying which serdes to use
> for serializing/deserializing the records to/from RocksDB Streams will use
> the default Serdes specified in the config, but the defaults may not always
> match up with what you need, as you have found out.
>
> I believe if you try ..reduce((v1, v2) -> v1 + v2,
> Materialized.with(Serdes.String(), Serdes.Double())... (I'm assuming your
> keys are of type String here) it should solve your issue.
>
> HTH,
> Bill
>
> On Tue, Apr 9, 2019 at 1:14 PM Gioacchino Vino <gioacchinovino@gmail.com>
> wrote:
>
>> Hi experts,
>>
>>
>> I believe to understand there is the need to set the serde for the
>> Double type after/in the map function for a re-partition task.
>>
>> I can't figure out where to specified. I've already tried to find the
>> answer on documentation and article but I failed.
>>
>> The following code
>>
>>
>> KStream<String, Triplet<String, Double, String>> sum_data = ...
>>
>>           KStream<String, String> aggr_stream =
>> sum_data.selectKey((key,value) -> value.getValue0())
>> .mapValues((key,value) -> value.getValue1())
>> .groupByKey()
>> .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
>> .reduce((v1,v2) -> v1 + v2 )
>> .toStream()
>> .map((key,value) -> new
>>
>> KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
>>
>>
>> produces the a StreamException
>>
>> Caused by: org.apache.kafka.streams.errors.StreamsException: A
>> serializer (key: org.apache.kafka.common.serialization.StringSerializer
>> / value: org.apache.kafka.common.serialization.StringSerializer) is not
>> compatible to the actual key or value type (key type: java.lang.String /
>> value type: java.lang.Double). Change the default Serdes in StreamConfig
>> or provide correct Serdes via method parameters.
>>
>> Since the default Serdes are both (key and value ) String
>>
>> Of course if I force the String type, for example using the following code
>>
>>
>> KStream<String, String> aggr_stream = sum_data.selectKey((key,value) ->
>> value.getValue0())
>> .mapValues((key,value) -> value.getValue1()*.toString(*))
>> .groupByKey()
>> .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
>> .reduce((v1,v2) -> *new Double( Double.parseDouble(v1) +
>> Double.parseDouble(v2) ).toString()* )
>> .toStream()
>> .map((key,value) -> new
>>
>> KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
>>
>>
>> all works, but of course it's not the best way to do it.
>>
>> Someone could help me?
>>
>> Thanks in advance,
>>
>> Gioacchino
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message