kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: KStreams to KTable join
Date Thu, 06 Jul 2017 05:14:30 GMT
When KStream / KTable is created from a source topic, both of them has a
record as a key-value pair, and the key is read from Kafka as the message
key.

What you showed in JSON seems only be the value of the message, and hence
I'm asking what's the key of the message, which will be the key of the
streams record.


Guozhang

On Thu, Jun 29, 2017 at 2:34 PM, Shekar Tippur <ctippur@gmail.com> wrote:

> Guozhang,
>
> "1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream."
>
> Can you please elaborate on the notion of key? By keys, do you mean kafka
> partition keys?
> For a json kstream to ktable example, can you please show me a sample
> input?
>
> For me, the ktable has:
>
> {"user_name": "Joe", "location": "US", "gender": "male"}
> {"user_name": "Julie", "location": "US", "gender": "female"}
>
> {"user_name": "Kawasaki", "location": "Japan", "gender": "male"}
>
> The kstream gets a event (KStreams)
>
> {"user": "Joe", "custom": {"choice":"vegan"}}
>
> Is this data right or do I need to have a key and then a json - as in:
>
>
> "joe", {"user_name": "Joe", "location": "US", "gender": "male"}
>
>
>
>
> On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > I think your issue is in two folds:
> >
> > 1) if the coming record's key is null, then when it flows into the join
> > processor inside the topology this record will be dropped as it cannot be
> > joined with any records from the other stream.
> >
> > 2) the NPE you are getting when giving it the non-null keyed record seems
> > because, you are using "SnowServerDeserialzer" (is it set as the default
> > key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> > is typed String. You need to override the key deserialize when
> constructing
> > the "cache" KTable as well:
> >
> > ----------------
> > KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> > rawSerde, "cache", "local-cache");
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ctippur@gmail.com>
> wrote:
> >
> > > Guozhang
> > >
> > > I am using 0.10.2.1 version
> > >
> > > - Shekar
> > >
> > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > >
> > > > Hi Shekar,
> > > >
> > > > Could you demonstrate your input data. More specifically, what are
> the
> > > key
> > > > types of your input streams, and are they not-null values? It seems
> the
> > > > root cause is similar to the other thread you asked on the mailing
> > list.
> > > >
> > > > Also, could you provide your used Kafka Streams version?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ctippur@gmail.com>
> > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I am having trouble implementing streams to table join.
> > > > >
> > > > > I have 2 POJO's each representing streams and table data
> structures.
> > > raw
> > > > > topic contains streams and cache topic contains table structure.
> The
> > > join
> > > > > is not happening since the print statement is not being called.
> > > > >
> > > > > Appreciate any pointers.
> > > > >
> > > > > - Shekar
> > > > >
> > > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > > CachePOJOClass,RawPOJOClass>() {
> > > > >
> > > > >     @Override
> > > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > > >
> > > > >         String src=r.getSource();
> > > > >         String cSrc=c.getSnowHost();
> > > > >         Custom custom=new Custom();
> > > > >
> > > > >         if (src.matches(snowSrc)){
> > > > >             System.out.println("In apply code");
> > > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > > >             r.setCustom(custom);
> > > > >         }
> > > > >         return r;
> > > > >     }
> > > > > }).to("parser");
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message