kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mathieu D <matd...@gmail.com>
Subject Re: join not working in relation to how StreamsBuilder builds the topology
Date Tue, 11 Aug 2020 14:02:51 GMT
Hi John,

Thanks for your answer.

Reading your suggestions forced me to reconsider one more time the
partitioner set on inputs. I already challenged all of them .. except one
input that was provided in my test infrastructure by kafkacat.
And it appears kafkacat is not using by default the same partitioner as we
have in kafka-streams world. Bummer.
(aaand I spent entire days on it)

Problem solved


Le mar. 11 août 2020 à 07:18, John Roesler <vvcephei@apache.org> a écrit :

> Hi Mathieu,
> That sounds frustrating. I’m sorry for the trouble.
> From what you described, it does sound like something wacky is going on
> with the partitioning. In particular, the fact that both joins work when
> you set everything to 1 partition.
> You mentioned that you’re using the default partitioner everywhere. Can
> you confirm whether all the source topics and all the repartition topics
> also have the same number of partitions (when you’re not forcing them to 1,
> of course)?
> Are the transformers changing the keys? If they are not, then you can use
> transformValue to avoid the repartition. If they are, then the repartition
> topics are indeed necessary. Streams should ensure that the repartition
> topics get the same number of partitions as the topic you’re joining with.
> As you mentioned, I can only speculate without seeing the code. I think my
> next step would be to find a specific null join output that you think
> should have been non-null and trace the key back through the topology. You
> should be able to locate the key in each of the topics and state stores to
> verify that it’s in the right partition all the way through.
> You could also experiment with the trace logs, but they are super verbose.
> Or you could try running the app in an IDE and setting breakpoints to
> figure out what is happening each step of the way.
> The funny thing about a leftJoin in particular is that it would only be
> null if you’re getting records from the right, but none from the left. Any
> record from the left would instead produce a K:(LeftVal, null) result. It
> seems like even if the repartition is somehow going to the wrong partition,
> you should see the (left, null) result at some point. I’m struggling to
> think why you would only see null results.
> I hope this helps!
> -John
> On Mon, Aug 10, 2020, at 09:44, Mathieu D wrote:
> > Dear community,
> >
> > I have a quite tough problem I struggle to diagnose and fix.
> > I'm not sure if it's a bug in Kafka-streams or some subtlety I didn't get
> > in using the DSL api.
> >
> > The problem is the following.
> > We have a quite elaborate stream app, working well, in production. We'd
> > like to add a left join with a KTable (data is coming from a DB via kafka
> > connect jdbc source).
> > So we end-up with a topology like this:
> >
> > Event Source ---- (some transformations and joins) ----- leftJoin(A:
> > KTable) ----- leftJoin(B: KTable) ---- sinks
> >
> > The new leftjoin is the one joining A.
> > The transformations are several custom Transformers.
> >
> > In tests with TopologyTestDriver, all is good, we can validate the
> general
> > logic.
> > In integration tests with a real Kafka (in a docker, though), we can't
> > manage to have both left joins work at the same time !
> > The leftJoin with `A` always return null.
> >
> > I ran dozen of tests, tweaking and fiddling everything, and I found out
> > that it's related to partitioning. If I force the number of partitions
> down
> > to 1 (by setting all input topics to 1 partition), the join works.
> > In one of the tests, I suspected one of the transformations, so I removed
> > it. The topology shown by describe() changed quite significantly (going
> > from 2 subtoplogies to 1), and this made the leftJoin with A work...and
> the
> > leftJoin with B fail.
> >
> > It drives me crazy.
> >
> > Activating the optimization didn't help.
> > The input topics for KTables A and B are read with a TimestamExtractor to
> > 0, since this is static data, to make sure we don't run into timestamp
> > ordering issues.
> > We double-checked and tripled-checked the keys in various stages, and
> we're
> > sure they're good (by the way, it works with 1 partition).
> > Partitioner is always the default everywhere (in inputs,
> kafka-connect...),
> > actually we never touch this.
> >
> > Actually it seems related to repartitioning placed in the topology by
> > StreamsBuilder (probably in relation to transformers ?)
> >
> > So, I imagine you can't help much without seeing the code, but if you
> think
> > of anything that could help diagnosing this further, please tell.
> >
> > Mathieu
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message