kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <vvcep...@apache.org>
Subject Re: join not working in relation to how StreamsBuilder builds the topology
Date Tue, 11 Aug 2020 14:56:12 GMT
Hi Mathieu,

Aaah, that is a bummer. The littlest things can be
the hardest to find. Well, I'm glad I was able to
help in some capacity.

Cheers,
-John

On Tue, 2020-08-11 at 16:02 +0200, Mathieu D
wrote:
> Hi John,
> 
> Thanks for your answer.
> 
> Reading your suggestions forced me to reconsider one more time the
> partitioner set on inputs. I already challenged all of them .. except one
> input that was provided in my test infrastructure by kafkacat.
> And it appears kafkacat is not using by default the same partitioner as we
> have in kafka-streams world. Bummer.
> (aaand I spent entire days on it)
> 
> Problem solved
> Thanks
> 
> Mathieu
> 
> Le mar. 11 août 2020 à 07:18, John Roesler <vvcephei@apache.org> a écrit :
> 
> > Hi Mathieu,
> > 
> > That sounds frustrating. I’m sorry for the trouble.
> > 
> > From what you described, it does sound like something wacky is going on
> > with the partitioning. In particular, the fact that both joins work when
> > you set everything to 1 partition.
> > 
> > You mentioned that you’re using the default partitioner everywhere. Can
> > you confirm whether all the source topics and all the repartition topics
> > also have the same number of partitions (when you’re not forcing them to 1,
> > of course)?
> > 
> > Are the transformers changing the keys? If they are not, then you can use
> > transformValue to avoid the repartition. If they are, then the repartition
> > topics are indeed necessary. Streams should ensure that the repartition
> > topics get the same number of partitions as the topic you’re joining with.
> > 
> > As you mentioned, I can only speculate without seeing the code. I think my
> > next step would be to find a specific null join output that you think
> > should have been non-null and trace the key back through the topology. You
> > should be able to locate the key in each of the topics and state stores to
> > verify that it’s in the right partition all the way through.
> > 
> > You could also experiment with the trace logs, but they are super verbose.
> > Or you could try running the app in an IDE and setting breakpoints to
> > figure out what is happening each step of the way.
> > 
> > The funny thing about a leftJoin in particular is that it would only be
> > null if you’re getting records from the right, but none from the left. Any
> > record from the left would instead produce a K:(LeftVal, null) result. It
> > seems like even if the repartition is somehow going to the wrong partition,
> > you should see the (left, null) result at some point. I’m struggling to
> > think why you would only see null results.
> > 
> > 
> > I hope this helps!
> > -John
> > 
> > 
> > On Mon, Aug 10, 2020, at 09:44, Mathieu D wrote:
> > > Dear community,
> > > 
> > > I have a quite tough problem I struggle to diagnose and fix.
> > > I'm not sure if it's a bug in Kafka-streams or some subtlety I didn't get
> > > in using the DSL api.
> > > 
> > > The problem is the following.
> > > We have a quite elaborate stream app, working well, in production. We'd
> > > like to add a left join with a KTable (data is coming from a DB via kafka
> > > connect jdbc source).
> > > So we end-up with a topology like this:
> > > 
> > > Event Source ---- (some transformations and joins) ----- leftJoin(A:
> > > KTable) ----- leftJoin(B: KTable) ---- sinks
> > > 
> > > The new leftjoin is the one joining A.
> > > The transformations are several custom Transformers.
> > > 
> > > In tests with TopologyTestDriver, all is good, we can validate the
> > general
> > > logic.
> > > In integration tests with a real Kafka (in a docker, though), we can't
> > > manage to have both left joins work at the same time !
> > > The leftJoin with `A` always return null.
> > > 
> > > I ran dozen of tests, tweaking and fiddling everything, and I found out
> > > that it's related to partitioning. If I force the number of partitions
> > down
> > > to 1 (by setting all input topics to 1 partition), the join works.
> > > In one of the tests, I suspected one of the transformations, so I removed
> > > it. The topology shown by describe() changed quite significantly (going
> > > from 2 subtoplogies to 1), and this made the leftJoin with A work...and
> > the
> > > leftJoin with B fail.
> > > 
> > > It drives me crazy.
> > > 
> > > Activating the optimization didn't help.
> > > The input topics for KTables A and B are read with a TimestamExtractor to
> > > 0, since this is static data, to make sure we don't run into timestamp
> > > ordering issues.
> > > We double-checked and tripled-checked the keys in various stages, and
> > we're
> > > sure they're good (by the way, it works with 1 partition).
> > > Partitioner is always the default everywhere (in inputs,
> > kafka-connect...),
> > > actually we never touch this.
> > > 
> > > Actually it seems related to repartitioning placed in the topology by
> > > StreamsBuilder (probably in relation to transformers ?)
> > > 
> > > So, I imagine you can't help much without seeing the code, but if you
> > think
> > > of anything that could help diagnosing this further, please tell.
> > > 
> > > Mathieu
> > > 


Mime
View raw message