kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: question on serialization ..
Date Tue, 13 Feb 2018 14:36:06 GMT
ah .. ok .. thanks for the clarification .. for reduce I guess the overload
with Materialized does the same thing ..

regards.

On Tue, Feb 13, 2018 at 2:24 PM, Damian Guy <damian.guy@gmail.com> wrote:

> There is an overload `leftJoin(KTable, ValuJoiner, Joined)`
>
> Joined is where you specify the Serde for the KTable and for the resulting
> type. We don't need the Serde for the stream at this point as the value has
> already been deserialized.
>
> HTH,
> Damian
>
> On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh <ghosh.debasish@gmail.com>
> wrote:
>
> > Regarding “has an according overload” I agree. But some operators like
> > reduce and leftJoin use the serdes implicitly and from the config. So if
> > the developer is not careful enough to have the default serdes correct
> then
> > it results in runtime error.
> >
> > Also one more confusion on my part is that in config we can give one
> serde
> > for key and value. What happens if I have 2 leftJoin in my transformation
> > that needs different serdes from config. There is no overload for
> leftJoin
> > that allows me to provide a serde. Or am I missing something ?
> >
> > regards.
> >
> > On Tue, 13 Feb 2018 at 12:14 AM, Matthias J. Sax <matthias@confluent.io>
> > wrote:
> >
> > > Each operator that needs to use a Serde, has a an according overload
> > > method that allows you to overwrite the Serde. If you don't overwrite
> > > it, the operator uses the Serde from the config.
> > >
> > > > If one gets the default
> > > >> serializer wrong then she gets run time errors in serialization /
> > > >> de-serialization (ClassCastException etc.)
> > >
> > > Default Serde are helpful if you use a generic format like Avro
> > > thoughout the whole topology. If you have many different types, it
> might
> > > be better to set default Serdes to `null` and set the Serde for each
> > > operator individually.
> > >
> > >
> > > -Matthias
> > >
> > > On 2/12/18 2:16 AM, Debasish Ghosh wrote:
> > > > Thanks a lot for the clear answer.
> > > >
> > > > One of the concerns that I have is that it's not always obvious when
> > the
> > > > default serializers are used. e.g. it looks like
> KGroupedStream#reduce
> > > also
> > > > uses the default serializer under the hood. If one gets the default
> > > > serializer wrong then she gets run time errors in serialization /
> > > > de-serialization (ClassCastException etc.), which are quite hard to
> > track
> > > > down.
> > > >
> > > > On Mon, Feb 12, 2018 at 4:52 AM, Matthias J. Sax <
> > matthias@confluent.io>
> > > > wrote:
> > > >
> > > >> For stream-table-join, only the table is (de)serialized, the
> > stream-side
> > > >> in only piped through and does lookups into the table.
> > > >>
> > > >> And when reading the stream
> > > >> (https://github.com/confluentinc/kafka-streams-
> > > >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > > >> StreamToTableJoinScalaIntegrationTest.scala#L129)
> > > >> the Serdes from the config are overwritten by parameters passed into
> > > >> `#stream()`
> > > >>
> > > >> The default Serdes are used when reading/writing from/to a
> topic/store
> > > >> (including repartition or changelog) and if the operator does not
> > > >> overwrite the default Serdes via passed-in parameters.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 2/10/18 10:34 PM, Debasish Ghosh wrote:
> > > >>> The inputs to the leftJoin are the stream with [String, Long]
and
> the
> > > >> table
> > > >>> with [String, String]. Is the default serializer (I mean from
the
> > > config)
> > > >>> used for [String, String] ? Then how does the [String, Long]
> > > >> serialization
> > > >>> work ?
> > > >>>
> > > >>> I guess the basic issue that I am trying to understand is how
the
> > > default
> > > >>> serialisers (stringSerde, stringSerde) registered in config used
> for
> > > >>> serialising the inputs of leftJoin ..
> > > >>>
> > > >>> regards.
> > > >>>
> > > >>> On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > >>> wrote:
> > > >>>
> > > >>>> userClicksJoinRegion is never serialized...
> > > >>>>
> > > >>>> It the result of the join and the join only (de)serializes
its
> input
> > > in
> > > >>>> the internal stores.
> > > >>>>
> > > >>>> The output it forwarded in-memory to a consecutive map and
return
> > > >>>> `clicksByRegion` that is [String,Long].
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>> On 2/10/18 1:17 PM, Ted Yu wrote:
> > > >>>>> Please read the javadoc:
> > > >>>>>
> > > >>>> https://github.com/apache/kafka/blob/trunk/streams/src/
> > > >> main/java/org/apache/kafka/streams/Consumed.java
> > > >>>>>
> > > >>>>> and correlate with the sample code.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>>
> > > >>>>> On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> > > >>>> ghosh.debasish@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Looking at
> > > >>>>>> https://github.com/confluentinc/kafka-streams-
> > > >>>>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/
> streams/
> > > >>>>>> StreamToTableJoinScalaIntegrationTest.scala#L148,
> > > >>>>>> it seems that the leftJoin generates a KStream[String,
(String,
> > > >> Long)],
> > > >>>>>> which means the value is a tuple of (String, Long)
.. I am not
> > able
> > > to
> > > >>>> get
> > > >>>>>> how this will serialize/de-serialize with the default
> serializers
> > > >> which
> > > >>>> are
> > > >>>>>> both stringSerde for keys and values.
> > > >>>>>>
> > > >>>>>> or am I missing something ?
> > > >>>>>>
> > > >>>>>> regards.
> > > >>>>>>
> > > >>>>>> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhihong@gmail.com>
> > > wrote:
> > > >>>>>>
> > > >>>>>>> If I read the code correctly, the operation on
this line
> prepares
> > > the
> > > >>>>>> input
> > > >>>>>>> for the (stringSerde, stringSerde) specified on
line 142:
> > > >>>>>>>
> > > >>>>>>>       .leftJoin(userRegionsTable, (clicks: Long,
region:
> String)
> > =>
> > > >> (if
> > > >>>>>>> (region == null) "UNKNOWN" else region, clicks))
> > > >>>>>>>
> > > >>>>>>> FYI
> > > >>>>>>>
> > > >>>>>>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh
<
> > > >>>>>> ghosh.debasish@gmail.com
> > > >>>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi -
> > > >>>>>>>>
> > > >>>>>>>> I was going through this example at
> > > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > > >>>>>>>>
> > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala,
> > > >>>>>>>> especially the leftJoin part
> > > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > > >>>>>>>>
> > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
> > > >>>>>>>> This leftJoin returns KStream[String, (String,
Long)], while
> > > default
> > > >>>>>>>> serializers are String for both key and value
as in
> > > >>>>>>>> https://github.com/confluentinc/kafka-streams-
> > > >>>>>>>>
> > examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> > > >>>>>>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
> > > >>>>>>>> My question is how does this serialization
work here ? I mean
> > how
> > > >> does
> > > >>>>>>> the
> > > >>>>>>>> tuple get serialized with the default serializers
? And
> leftJoin
> > > >> only
> > > >>>>>>> works
> > > >>>>>>>> with default serializers ..
> > > >>>>>>>>
> > > >>>>>>>> regards.
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> Debasish Ghosh
> > > >>>>>>>> http://manning.com/ghosh2
> > > >>>>>>>> http://manning.com/ghosh
> > > >>>>>>>>
> > > >>>>>>>> Twttr: @debasishg
> > > >>>>>>>> Blog: http://debasishg.blogspot.com
> > > >>>>>>>> Code: http://github.com/debasishg
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Debasish Ghosh
> > > >>>>>> http://manning.com/ghosh2
> > > >>>>>> http://manning.com/ghosh
> > > >>>>>>
> > > >>>>>> Twttr: @debasishg
> > > >>>>>> Blog: http://debasishg.blogspot.com
> > > >>>>>> Code: http://github.com/debasishg
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>> --
> > > >>> Sent from my iPhone
> > > >>>
> > > >>
> > > >>
> > > >
> > > >
> > >
> > > --
> > Sent from my iPhone
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message