kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: Kafka Streams - Java StreamException - Serdes issue
Date Tue, 09 Apr 2019 18:44:46 GMT
Hi Gioacchino,

If I'm understanding your topology correctly it looks like you are doing a
reduce operation where the result is a double.

For stateful operations, Kafka Streams uses persistent state stores for
keeping track of the update stream.  When using the
KGroupedStream#reduce method,
if you don't provide a Materialized instance specifying which serdes to use
for serializing/deserializing the records to/from RocksDB Streams will use
the default Serdes specified in the config, but the defaults may not always
match up with what you need, as you have found out.

I believe if you try ..reduce((v1, v2) -> v1 + v2,
Materialized.with(Serdes.String(), Serdes.Double())... (I'm assuming your
keys are of type String here) it should solve your issue.

HTH,
Bill

On Tue, Apr 9, 2019 at 1:14 PM Gioacchino Vino <gioacchinovino@gmail.com>
wrote:

> Hi experts,
>
>
> I believe to understand there is the need to set the serde for the
> Double type after/in the map function for a re-partition task.
>
> I can't figure out where to specified. I've already tried to find the
> answer on documentation and article but I failed.
>
> The following code
>
>
> KStream<String, Triplet<String, Double, String>> sum_data = ...
>
>          KStream<String, String> aggr_stream =
> sum_data.selectKey((key,value) -> value.getValue0())
> .mapValues((key,value) -> value.getValue1())
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
> .reduce((v1,v2) -> v1 + v2 )
> .toStream()
> .map((key,value) -> new
>
> KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
>
>
> produces the a StreamException
>
> Caused by: org.apache.kafka.streams.errors.StreamsException: A
> serializer (key: org.apache.kafka.common.serialization.StringSerializer
> / value: org.apache.kafka.common.serialization.StringSerializer) is not
> compatible to the actual key or value type (key type: java.lang.String /
> value type: java.lang.Double). Change the default Serdes in StreamConfig
> or provide correct Serdes via method parameters.
>
> Since the default Serdes are both (key and value ) String
>
> Of course if I force the String type, for example using the following code
>
>
> KStream<String, String> aggr_stream = sum_data.selectKey((key,value) ->
> value.getValue0())
> .mapValues((key,value) -> value.getValue1()*.toString(*))
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
> .reduce((v1,v2) -> *new Double( Double.parseDouble(v1) +
> Double.parseDouble(v2) ).toString()* )
> .toStream()
> .map((key,value) -> new
>
> KeyValue<String,String>(key.toString(),key.toString()+"->"+value.toString()));
>
>
> all works, but of course it's not the best way to do it.
>
> Someone could help me?
>
> Thanks in advance,
>
> Gioacchino
>
>
>
>
>
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message