kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?
Date Tue, 01 May 2018 19:29:25 GMT
Hi Artur,

Thanks for the clarification.

I don't think that ".toStream()" actually does anything besides change the
context from KTable to KStream in the DSL. The Javadoc says:

* Note that this is a logical operation and only changes the
> "interpretation" of the stream, i.e., each record of
> * this changelog stream is no longer treated as an updated record (cf.
> {@link KStream} vs {@code KTable}).


Not to belabor the point, but I wouldn't want you to focus too much on
getting rid of the "toStream" and in favor of the same methods on KTable,
as I think that would have the exact same semantics.

It's entirely possible that some additional tuning on the join could reduce
the deplicates you're seeing. For example, what are your current settings
for commit interval and dedup cache size?

In any case, though, Kafka Streams's deduplication mechanism is only
best-effort. So if your correctness depends on unique events (as yours
does), I still think you're better off coding in anticipation of
duplicates. For example, you could implement hashCode and equals on
ClaimAndPayment and store them in a LinkedHashSet (to preserve both
uniqueness and order).

Hope that helps,
-John


On Tue, May 1, 2018 at 12:40 PM, Artur Mrozowski <artmro@gmail.com> wrote:

> Hi John,
> yes, the answer is very helpful and your understanding of the data flow is
> correct. Although, deduplication is not the issue because there will not be
> any duplicates inserted into the flow.
> These, the duplicates will be generated, from unique records after the join
> between claim and payments and converting the result to stream.
> But perhaps that stream is entirely avoidable?
>
> So it would look something like this:
>
> KTable<String,ArrayList> left
> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> "3_0", "claimtime": 708.521153490306}
>
> and KTable <String,ArrayList> right
>
> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> "payment": 847015.1437781961}
>
> When I leftjoin theses two objects the result in the state store will be an
> object containing two  ArrayLists left and right, like this
>
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>
> But I want to continue processing the results by using groupBy and
> aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
> repartion and changelog topics will contain two identical messages, like
> this
>
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
> claimreporttime":"55948.33110985625","claimcounter":"
> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>
> Best regards
> Artur
>
>
> On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <john@confluent.io> wrote:
>
> > Hello Artur,
> >
> > Apologies in advance if I say something incorrect, as I'm still a little
> > new to this project.
> >
> > If I followed your example, then I think the scenario is that you're
> > joining "claims" and "payments", grouping by "claimNumber", and then
> > building a list for each "claimNumber" of all the claim/payment pairs. Is
> > that right?
> >
> > It's not in your example, but the grouped stream or table for "claims"
> > (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the
> > same key, right? In that case, the result of their join will also be
> keyed
> > by that same key.
> >
> > It seems like the problem you're seeing is that that list contains the
> same
> > claim/payment pair multiple times for a given claimNumber. Did I get it?
> >
> > In that case, I don't know if what you're seeing is the same issue Damian
> > reported in KAFKA-4609, since the problem he reported was that there was
> no
> > deduping cache after the join, only before it, unless you register a
> state
> > store representing the join itself. In your case, it looks like you do
> > register a state store representing the join, the
> > "CLAIM_AND_PAYMENT_JOIN_STORE".
> > So you will have a cache that can dedup the join result.
> >
> > Note that the join itself is what causes duplicates, not the subsequent "
> > claimAndPaymentKTable.toStream()". For example, if I see input like
> this:
> >
> > (left stream):
> > t1: k1 -> L1
> > t3: k1 -> L1
> >
> > (right stream):
> > t2: k1 -> R1
> >
> > Then, without deduplication, the resulting join would be:
> > (left.join(right) stream):
> > t1: k1 -> (L1, null)
> > t2: k1 -> (L1, R1)
> > t3: k1 -> (L1, R1)
> >
> > Note that we see apparently duplicate join results, but really the
> meaning
> > of the join stream is that "as of right now, this is the value for this
> > key", so from the join's perspective it's not wrong to say "as of t2,
> k1's
> > value is (L1, R1)" and then to say it at t3 again.
> >
> > In Kafka Streams, there is a deduplication cache which can reduce such
> > duplicate events, but without unbounded memory, the cache can't guarantee
> > to remove all duplicates, so it's important to deal with the join result
> in
> > a semantically robust way.
> >
> > I think this also contains the key to resolving your issue; inside your
> > aggregator, instead of storing a list of *every event*, I think you'll
> want
> > to store a map of the *latest event by key*. (This would be the key
> that's
> > common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable).
> This
> > way, you'll automatically overwrite old, obsolete, join results with new
> > ones for the same key (whether or not the old result happens to be the
> same
> > as the new one).
> >
> > Does this help?
> > -John
> >
> > On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <artmro@gmail.com>
> wrote:
> >
> > > Hi,
> > > a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
> > > KTable joins followed by group by and aggregate on and KStream  and one
> > > additional join. Now this KTable/KTable join followed by group by  and
> > > aggregated genereates duplicates.
> > >
> > >
> > >
> > > I wonder if a possible workaround would be to remove the KStream after
> > > KTable/KTable join and make groupBy and aggregate  on the KTable?
> > >
> > >
> > >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > > CustomerAndPolicy(customer,policy));
> > >
> > >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > > AND_PAYMENT_JOIN_STORE);
> > >
> > >
> > >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > > KAFKA-4609?*
> > >
> > >        KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTabl
> e
> > =
> > > claimAndPaymentKStream
> > >                .groupBy((k,claimPay) ->
> > >                    (claimPay.claimList != null ) ?
> > >
> > > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > > claimnumber.split("_")[0])
> > > :  999,integerSerde,claimAndPaymentSerde )
> > >                .aggregate(
> > >                         ClaimAndPayment2::new,
> > >                         (claimKey,claimPay,claimAndPay2) -> {
> > >
> > >
> > > claimAndPay2.claimAndPaymentList.add(claimPay);
> > >
> > >                                 return claimAndPay2;
> > >
> > >                         }
> > >                         ,claimAndPayment2Serde
> > >                         ,CLAIM_AND_PAYMENT_STORE
> > >                 );
> > >
> > >
> > >
> > >
> > >
> > > Best regards
> > > Artur Mrozowski
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message