kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: How are custom keys compared during joins
Date Mon, 09 Dec 2019 00:21:23 GMT
Hello Sachin,

Thanks for the detailed description.

Your find is right that for stream-table join the table-side updates would
not trigger a join since stream records are not "materialized" or buffered
during the processing. The community has requested similar semantics to
improve as table-table joins and it is indeed on the discussion roadmap to
do this soon.

At the moment I think your can consider two options: if your table records
are just late, but their timestamps are still at least early than the
stream records that should be joined together, you can consider configure
max.idle.ms (details can be found here:
but note there's a minor bug regarding this which is just fixed in trunk so
maybe you'd want to cherry-pick:
If your table record's timestamp may even be later than the stream records
but you'd still want to process that record first than the corresponding
stream record in order to fulfill the join, then you'd need to consider
shifting the table record's timestamp in order to make sure that when
Streams tries to synchronize the joining streams / tables, those records
are picked first.


On Sat, Dec 7, 2019 at 10:05 PM Sachin Mittal <sjmittal@gmail.com> wrote:

> I figured out the issue.
> Basically my table2Mapped which is created from stream2 has some messages
> that arrive later than they arrive at  stream1 for same key.
> After checking stream to table semantics I found that the left side is
> joined to right side only for the record that exist for that key that time
> on the right side.
> It will not join if records arrive on right side at a later time since
> windowed joins are not applicable for stream to table.
> Anyway a question her can be does a stream to table window join makes sense
> like in this case?
> The reason I mapped the stream2 to  table2Mapped because stream2 usually
> has only one record per key, in some cases it may have multiple records
> with same value for same key.
> Hence converting to table made sense as I am only interested in the latest
> record for a key.
> But I guess if that records arrives later than the some other record in
> stream1
> for same key, it won't get joined.
> So now I have switched back to stream to stream window join.
> Let me know if there is any other way to handle such a case.
> Thanks
> Sachin
> On Sat, Dec 7, 2019 at 12:08 PM Sachin Mittal <sjmittal@gmail.com> wrote:
> > Hi,
> > I am facing some weird problems when joining two streams or a
> stream/table.
> > The joined stream does not contain all the joined records.
> >
> > Also note that my keys are custom keys for which I have implemented
> equals
> > and hashcode method
> > Is there something else also I need to do to ensure key1 === key2
> >
> > Let me illustrate it with a example:
> > //created a new stream with a new custom key
> > stream1Mapped = stream1.map((k,v) -> ...)
> > //checked the re-partition data and it has 2 records
> >
> > // created a new stream with a new custom key and converted a stream to
> > table using standard way
> > table2Mapped = stream2.map((k,v) -> ...).groupByKey().reduce((av, nv) ->
> > nv)
> > //checked the re-partition and change-log data and it has 3 records
> >
> > //now I join the stream with table on the new custom key
> > joinedStream = stream1Mapped.join(table2Mapped, (lv, rv) -> ..)
> >
> > //printed the data for stream
> > joinedStream.peek((k, v) -> print(v))
> > //is called only once ??
> >
> > This should to be called twice as keys for both the records in the stream
> > are there in table too.
> >
> > Please let me know if I understood the case well enough and if there is
> > anything I can do to debug this problem better.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >

-- Guozhang

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message