kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Ghosh <ghosh.debas...@gmail.com>
Subject Re: question on serialization ..
Date Sun, 11 Feb 2018 06:34:58 GMT
The inputs to the leftJoin are the stream with [String, Long] and the table
with [String, String]. Is the default serializer (I mean from the config)
used for [String, String] ? Then how does the [String, Long] serialization
work ?

I guess the basic issue that I am trying to understand is how the default
serialisers (stringSerde, stringSerde) registered in config used for
serialising the inputs of leftJoin ..

regards.

On Sun, 11 Feb 2018 at 8:53 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> userClicksJoinRegion is never serialized...
>
> It the result of the join and the join only (de)serializes its input in
> the internal stores.
>
> The output it forwarded in-memory to a consecutive map and return
> `clicksByRegion` that is [String,Long].
>
>
> -Matthias
>
> On 2/10/18 1:17 PM, Ted Yu wrote:
> > Please read the javadoc:
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java
> >
> > and correlate with the sample code.
> >
> > Thanks
> >
> > On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh <
> ghosh.debasish@gmail.com>
> > wrote:
> >
> >> Looking at
> >> https://github.com/confluentinc/kafka-streams-
> >> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >> StreamToTableJoinScalaIntegrationTest.scala#L148,
> >> it seems that the leftJoin generates a KStream[String, (String, Long)],
> >> which means the value is a tuple of (String, Long) .. I am not able to
> get
> >> how this will serialize/de-serialize with the default serializers which
> are
> >> both stringSerde for keys and values.
> >>
> >> or am I missing something ?
> >>
> >> regards.
> >>
> >> On Sun, Feb 11, 2018 at 2:30 AM, Ted Yu <yuzhihong@gmail.com> wrote:
> >>
> >>> If I read the code correctly, the operation on this line prepares the
> >> input
> >>> for the (stringSerde, stringSerde) specified on line 142:
> >>>
> >>>       .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if
> >>> (region == null) "UNKNOWN" else region, clicks))
> >>>
> >>> FYI
> >>>
> >>> On Sat, Feb 10, 2018 at 11:00 AM, Debasish Ghosh <
> >> ghosh.debasish@gmail.com
> >>>>
> >>> wrote:
> >>>
> >>>> Hi -
> >>>>
> >>>> I was going through this example at
> >>>> https://github.com/confluentinc/kafka-streams-
> >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>> StreamToTableJoinScalaIntegrationTest.scala,
> >>>> especially the leftJoin part
> >>>> https://github.com/confluentinc/kafka-streams-
> >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>> StreamToTableJoinScalaIntegrationTest.scala#L156.
> >>>> This leftJoin returns KStream[String, (String, Long)], while default
> >>>> serializers are String for both key and value as in
> >>>> https://github.com/confluentinc/kafka-streams-
> >>>> examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/
> >>>> StreamToTableJoinScalaIntegrationTest.scala#L112-L113.
> >>>> My question is how does this serialization work here ? I mean how does
> >>> the
> >>>> tuple get serialized with the default serializers ? And leftJoin only
> >>> works
> >>>> with default serializers ..
> >>>>
> >>>> regards.
> >>>>
> >>>> --
> >>>> Debasish Ghosh
> >>>> http://manning.com/ghosh2
> >>>> http://manning.com/ghosh
> >>>>
> >>>> Twttr: @debasishg
> >>>> Blog: http://debasishg.blogspot.com
> >>>> Code: http://github.com/debasishg
> >>>>
> >>>
> >>
> >>
> >>
> >> --
> >> Debasish Ghosh
> >> http://manning.com/ghosh2
> >> http://manning.com/ghosh
> >>
> >> Twttr: @debasishg
> >> Blog: http://debasishg.blogspot.com
> >> Code: http://github.com/debasishg
> >>
> >
>
> --
Sent from my iPhone

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message