kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: question on serialization ..
Date Tue, 13 Feb 2018 08:54:51 GMT
There is an overload `leftJoin(KTable, ValuJoiner, Joined)`

Joined is where you specify the Serde for the KTable and for the resulting
type. We don't need the Serde for the stream at this point as the value has
already been deserialized.

HTH,
Damian

On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh <ghosh.debasish@gmail.com>
wrote:

> Regarding “has an according overload” I agree. But some operators like
> reduce and leftJoin use the serdes implicitly and from the config. So if
> the developer is not careful enough to have the default serdes correct then
> it results in runtime error.
>
> Also one more confusion on my part is that in config we can give one serde
> for key and value. What happens if I have 2 leftJoin in my transformation
> that needs different serdes from config. There is no overload for leftJoin
> that allows me to provide a serde. Or am I missing something ?
>
> regards.
>
> On Tue, 13 Feb 2018 at 12:14 AM, Matthias J. Sax <matthias@confluent.io>
> wrote:
>
> > Each operator that needs to use a Serde, has a an according overload
> > method that allows you to overwrite the Serde. If you don't overwrite
> > it, the operator uses the Serde from the config.
> >
> > > If one gets the default
> > >> serializer wrong then she gets run time errors in serialization /
> > >> de-serialization (ClassCastException etc.)
> >
> > Default Serde are helpful if you use a generic format like Avro
> > thoughout the whole topology. If you have many different types, it might
> > be better to set default Serdes to `null` and set the Serde for each
> > operator individually.
> >
> >
> > -Matthias
> >
> > On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> > > Thanks a lot for the clear answer.
> > >
> > > One of the concerns that I have is that it's not always obvious when
> the
> > > default serializers are used. e.g. it looks like KGroupedStream#reduce
> > also
> > > uses the default serializer under the hood. If one gets the default
> > > serializer wrong then she gets run time errors in serialization /
> > > de-serialization (ClassCastException etc.), which are quite hard to
> track
> > > down.
> > >
> > > On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > >> For stream-table-join, only the table is (de)serialized, the
> stream-side
> > >> in only piped through and does lookups into the table.
> > >>
> > >> And when reading the stream
> > >> (https://github.com/confluentinc/kafka-streams-
> > >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > >> StreamToTableJoinScalaIntegrationTest.scala#L129)
> > >> the Serdes from the config are overwritten by parameters passed into
> > >> `#stream()`
> > >>
> > >> The default Serdes are used when reading/writing from/to a topic/store
> > >> (including repartition or changelog) and if the operator does not
> > >> overwrite the default Serdes via passed-in parameters.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> > >>> The inputs to the leftJoin are the stream with [String, Long] and the
> > >> table
> > >>> with [String, String]. Is the default serializer (I mean from the
> > config)
> > >>> used for [String, String] ? Then how does the [String, Long]
> > >> serialization
> > >>> work ?
> > >>>
> > >>> I guess the basic issue that I am trying to understand is how the
> > default
> > >>> serialisers (stringSerde, stringSerde) registered in config used for
> > >>> serialising the inputs of leftJoin ..
> > >>>
> > >>> regards.
> > >>>
> > >>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > >>> wrote:
> > >>>
> > >>>> userClicksJoinRegion is never serialized...
> > >>>>
> > >>>> It the result of the join and the join only (de)serializes its
input
> > in
> > >>>> the internal stores.
> > >>>>
> > >>>> The output it forwarded in-memory to a consecutive map and return
> > >>>> `clicksByRegion` that is [String,Long].
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>> On 2/10/18 1:17 PM, Ted Yu wrote:
> > >>>>> Please read the javadoc:
> > >>>>>
> > >>>> https://github.com/apache/kafka/blob/trunk/streams/src/
> > >> main/java/org/apache/kafka/streams/Consumed.java
> > >>>>>
> > >>>>> and correlate with the sample code.
> > >>>>>
> > >>>>> Thanks
> > >>>>>
> > >>>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> > >>>> ghosh.debasish@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Looking at
> > >>>>>> https://github.com/confluentinc/kafka-streams-
> > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L148,
> > >>>>>> it seems that the leftJoin generates a KStream[String,
(String,
> > >> Long)],
> > >>>>>> which means the value is a tuple of (String, Long) .. I
am not
> able
> > to
> > >>>> get
> > >>>>>> how this will serialize/de-serialize with the default serializers
> > >> which
> > >>>> are
> > >>>>>> both stringSerde for keys and values.
> > >>>>>>
> > >>>>>> or am I missing something ?
> > >>>>>>
> > >>>>>> regards.
> > >>>>>>
> > >>>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhihong@gmail.com>
> > wrote:
> > >>>>>>
> > >>>>>>> If I read the code correctly, the operation on this
line prepares
> > the
> > >>>>>> input
> > >>>>>>> for the (stringSerde, stringSerde) specified on line
142:
> > >>>>>>>
> > >>>>>>>       .leftJoin(userRegionsTable, (clicks: Long, region:
String)
> =>
> > >> (if
> > >>>>>>> (region == null) "UNKNOWN" else region, clicks))
> > >>>>>>>
> > >>>>>>> FYI
> > >>>>>>>
> > >>>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
> > >>>>>> ghosh.debasish@gmail.com
> > >>>>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi -
> > >>>>>>>>
> > >>>>>>>> I was going through this example at
> > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > >>>>>>>>
> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala,
> > >>>>>>>> especially the leftJoin part
> > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > >>>>>>>>
> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
> > >>>>>>>> This leftJoin returns KStream[String, (String,
Long)], while
> > default
> > >>>>>>>> serializers are String for both key and value as
in
> > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > >>>>>>>>
> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
> > >>>>>>>> My question is how does this serialization work
here ? I mean
> how
> > >> does
> > >>>>>>> the
> > >>>>>>>> tuple get serialized with the default serializers
? And leftJoin
> > >> only
> > >>>>>>> works
> > >>>>>>>> with default serializers ..
> > >>>>>>>>
> > >>>>>>>> regards.
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> Debasish Ghosh
> > >>>>>>>> http://manning.com/ghosh2
> > >>>>>>>> http://manning.com/ghosh
> > >>>>>>>>
> > >>>>>>>> Twttr: @debasishg
> > >>>>>>>> Blog: http://debasishg.blogspot.com
> > >>>>>>>> Code: http://github.com/debasishg
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> Debasish Ghosh
> > >>>>>> http://manning.com/ghosh2
> > >>>>>> http://manning.com/ghosh
> > >>>>>>
> > >>>>>> Twttr: @debasishg
> > >>>>>> Blog: http://debasishg.blogspot.com
> > >>>>>> Code: http://github.com/debasishg
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>> --
> > >>> Sent from my iPhone
> > >>>
> > >>
> > >>
> > >
> > >
> >
> > --
> Sent from my iPhone
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message