kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Artur Mrozowski <art...@gmail.com>
Subject Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?
Date Tue, 01 May 2018 17:40:25 GMT
Hi John,
yes, the answer is very helpful and your understanding of the data flow is
correct. Although, deduplication is not the issue because there will not be
any duplicates inserted into the flow.
These, the duplicates will be generated, from unique records after the join
between claim and payments and converting the result to stream.
But perhaps that stream is entirely avoidable?

So it would look something like this:

KTable<String,ArrayList> left
{"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
"3_0", "claimtime": 708.521153490306}

and KTable <String,ArrayList> right

{"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
"payment": 847015.1437781961}

When I leftjoin theses two objects the result in the state store will be an
object containing two  ArrayLists left and right, like this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

But I want to continue processing the results by using groupBy and
aggregate so I convert reuslt of the leftjoin to stream. Now the resulting
repartion and changelog topics will contain two identical messages, like
this

{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
{"claimList":{"lst":[{"claimnumber":"3_0","claimtime":"708.521153490306","
claimreporttime":"55948.33110985625","claimcounter":"
0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}

Best regards
Artur


On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <john@confluent.io> wrote:

> Hello Artur,
>
> Apologies in advance if I say something incorrect, as I'm still a little
> new to this project.
>
> If I followed your example, then I think the scenario is that you're
> joining "claims" and "payments", grouping by "claimNumber", and then
> building a list for each "claimNumber" of all the claim/payment pairs. Is
> that right?
>
> It's not in your example, but the grouped stream or table for "claims"
> (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the
> same key, right? In that case, the result of their join will also be keyed
> by that same key.
>
> It seems like the problem you're seeing is that that list contains the same
> claim/payment pair multiple times for a given claimNumber. Did I get it?
>
> In that case, I don't know if what you're seeing is the same issue Damian
> reported in KAFKA-4609, since the problem he reported was that there was no
> deduping cache after the join, only before it, unless you register a state
> store representing the join itself. In your case, it looks like you do
> register a state store representing the join, the
> "CLAIM_AND_PAYMENT_JOIN_STORE".
> So you will have a cache that can dedup the join result.
>
> Note that the join itself is what causes duplicates, not the subsequent "
> claimAndPaymentKTable.toStream()". For example, if I see input like this:
>
> (left stream):
> t1: k1 -> L1
> t3: k1 -> L1
>
> (right stream):
> t2: k1 -> R1
>
> Then, without deduplication, the resulting join would be:
> (left.join(right) stream):
> t1: k1 -> (L1, null)
> t2: k1 -> (L1, R1)
> t3: k1 -> (L1, R1)
>
> Note that we see apparently duplicate join results, but really the meaning
> of the join stream is that "as of right now, this is the value for this
> key", so from the join's perspective it's not wrong to say "as of t2, k1's
> value is (L1, R1)" and then to say it at t3 again.
>
> In Kafka Streams, there is a deduplication cache which can reduce such
> duplicate events, but without unbounded memory, the cache can't guarantee
> to remove all duplicates, so it's important to deal with the join result in
> a semantically robust way.
>
> I think this also contains the key to resolving your issue; inside your
> aggregator, instead of storing a list of *every event*, I think you'll want
> to store a map of the *latest event by key*. (This would be the key that's
> common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable). This
> way, you'll automatically overwrite old, obsolete, join results with new
> ones for the same key (whether or not the old result happens to be the same
> as the new one).
>
> Does this help?
> -John
>
> On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <artmro@gmail.com> wrote:
>
> > Hi,
> > a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
> > KTable joins followed by group by and aggregate on and KStream  and one
> > additional join. Now this KTable/KTable join followed by group by  and
> > aggregated genereates duplicates.
> >
> >
> >
> > I wonder if a possible workaround would be to remove the KStream after
> > KTable/KTable join and make groupBy and aggregate  on the KTable?
> >
> >
> >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > CustomerAndPolicy(customer,policy));
> >
> >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > AND_PAYMENT_JOIN_STORE);
> >
> >
> >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > KAFKA-4609?*
> >
> >        KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable
> =
> > claimAndPaymentKStream
> >                .groupBy((k,claimPay) ->
> >                    (claimPay.claimList != null ) ?
> >
> > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > claimnumber.split("_")[0])
> > :  999,integerSerde,claimAndPaymentSerde )
> >                .aggregate(
> >                         ClaimAndPayment2::new,
> >                         (claimKey,claimPay,claimAndPay2) -> {
> >
> >
> > claimAndPay2.claimAndPaymentList.add(claimPay);
> >
> >                                 return claimAndPay2;
> >
> >                         }
> >                         ,claimAndPayment2Serde
> >                         ,CLAIM_AND_PAYMENT_STORE
> >                 );
> >
> >
> >
> >
> >
> > Best regards
> > Artur Mrozowski
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message