kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: question on serialization ..
Date Mon, 12 Feb 2018 10:16:24 GMT
Thanks a lot for the clear answer.

One of the concerns that I have is that it's not always obvious when the
default serializers are used. e.g. it looks like KGroupedStream#reduce also
uses the default serializer under the hood. If one gets the default
serializer wrong then she gets run time errors in serialization /
de-serialization (ClassCastException etc.), which are quite hard to track
down.

On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> For stream-table-join, only the table is (de)serialized, the stream-side
> in only piped through and does lookups into the table.
>
> And when reading the stream
> (https://github.com/confluentinc/kafka-streams-
> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> StreamToTableJoinScalaIntegrationTest.scala#L129)
> the Serdes from the config are overwritten by parameters passed into
> `#stream()`
>
> The default Serdes are used when reading/writing from/to a topic/store
> (including repartition or changelog) and if the operator does not
> overwrite the default Serdes via passed-in parameters.
>
>
> -Matthias
>
> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> > The inputs to the leftJoin are the stream with [String, Long] and the
> table
> > with [String, String]. Is the default serializer (I mean from the config)
> > used for [String, String] ? Then how does the [String, Long]
> serialization
> > work ?
> >
> > I guess the basic issue that I am trying to understand is how the default
> > serialisers (stringSerde, stringSerde) registered in config used for
> > serialising the inputs of leftJoin ..
> >
> > regards.
> >
> > On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> >> userClicksJoinRegion is never serialized...
> >>
> >> It the result of the join and the join only (de)serializes its input in
> >> the internal stores.
> >>
> >> The output it forwarded in-memory to a consecutive map and return
> >> `clicksByRegion` that is [String,Long].
> >>
> >>
> >> -Matthias
> >>
> >> On 2/10/18 1:17 PM, Ted Yu wrote:
> >>> Please read the javadoc:
> >>>
> >> https://github.com/apache/kafka/blob/trunk/streams/src/
> main/java/org/apache/kafka/streams/Consumed.java
> >>>
> >>> and correlate with the sample code.
> >>>
> >>> Thanks
> >>>
> >>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> >> ghosh.debasish@gmail.com>
> >>> wrote:
> >>>
> >>>> Looking at
> >>>> https://github.com/confluentinc/kafka-streams-
> >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>> StreamToTableJoinScalaIntegrationTest.scala#L148,
> >>>> it seems that the leftJoin generates a KStream[String, (String,
> Long)],
> >>>> which means the value is a tuple of (String, Long) .. I am not able
to
> >> get
> >>>> how this will serialize/de-serialize with the default serializers
> which
> >> are
> >>>> both stringSerde for keys and values.
> >>>>
> >>>> or am I missing something ?
> >>>>
> >>>> regards.
> >>>>
> >>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhihong@gmail.com>
wrote:
> >>>>
> >>>>> If I read the code correctly, the operation on this line prepares
the
> >>>> input
> >>>>> for the (stringSerde, stringSerde) specified on line 142:
> >>>>>
> >>>>>       .leftJoin(userRegionsTable, (clicks: Long, region: String)
=>
> (if
> >>>>> (region == null) "UNKNOWN" else region, clicks))
> >>>>>
> >>>>> FYI
> >>>>>
> >>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
> >>>> ghosh.debasish@gmail.com
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi -
> >>>>>>
> >>>>>> I was going through this example at
> >>>>>> https://github.com/confluentinc/kafka-streams-
> >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>>>> StreamToTableJoinScalaIntegrationTest.scala,
> >>>>>> especially the leftJoin part
> >>>>>> https://github.com/confluentinc/kafka-streams-
> >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
> >>>>>> This leftJoin returns KStream[String, (String, Long)], while
default
> >>>>>> serializers are String for both key and value as in
> >>>>>> https://github.com/confluentinc/kafka-streams-
> >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
> >>>>>> My question is how does this serialization work here ? I mean
how
> does
> >>>>> the
> >>>>>> tuple get serialized with the default serializers ? And leftJoin
> only
> >>>>> works
> >>>>>> with default serializers ..
> >>>>>>
> >>>>>> regards.
> >>>>>>
> >>>>>> --
> >>>>>> Debasish Ghosh
> >>>>>> http://manning.com/ghosh2
> >>>>>> http://manning.com/ghosh
> >>>>>>
> >>>>>> Twttr: @debasishg
> >>>>>> Blog: http://debasishg.blogspot.com
> >>>>>> Code: http://github.com/debasishg
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Debasish Ghosh
> >>>> http://manning.com/ghosh2
> >>>> http://manning.com/ghosh
> >>>>
> >>>> Twttr: @debasishg
> >>>> Blog: http://debasishg.blogspot.com
> >>>> Code: http://github.com/debasishg
> >>>>
> >>>
> >>
> >> --
> > Sent from my iPhone
> >
>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message