kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From John Roesler <j...@confluent.io>
Subject Re: Workaround for KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results KAFKA-4609?
Date Mon, 30 Apr 2018 15:30:17 GMT
Hello Artur,

Apologies in advance if I say something incorrect, as I'm still a little
new to this project.

If I followed your example, then I think the scenario is that you're
joining "claims" and "payments", grouping by "claimNumber", and then
building a list for each "claimNumber" of all the claim/payment pairs. Is
that right?

It's not in your example, but the grouped stream or table for "claims"
(claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the
same key, right? In that case, the result of their join will also be keyed
by that same key.

It seems like the problem you're seeing is that that list contains the same
claim/payment pair multiple times for a given claimNumber. Did I get it?

In that case, I don't know if what you're seeing is the same issue Damian
reported in KAFKA-4609, since the problem he reported was that there was no
deduping cache after the join, only before it, unless you register a state
store representing the join itself. In your case, it looks like you do
register a state store representing the join, the
"CLAIM_AND_PAYMENT_JOIN_STORE".
So you will have a cache that can dedup the join result.

Note that the join itself is what causes duplicates, not the subsequent "
claimAndPaymentKTable.toStream()". For example, if I see input like this:

(left stream):
t1: k1 -> L1
t3: k1 -> L1

(right stream):
t2: k1 -> R1

Then, without deduplication, the resulting join would be:
(left.join(right) stream):
t1: k1 -> (L1, null)
t2: k1 -> (L1, R1)
t3: k1 -> (L1, R1)

Note that we see apparently duplicate join results, but really the meaning
of the join stream is that "as of right now, this is the value for this
key", so from the join's perspective it's not wrong to say "as of t2, k1's
value is (L1, R1)" and then to say it at t3 again.

In Kafka Streams, there is a deduplication cache which can reduce such
duplicate events, but without unbounded memory, the cache can't guarantee
to remove all duplicates, so it's important to deal with the join result in
a semantically robust way.

I think this also contains the key to resolving your issue; inside your
aggregator, instead of storing a list of *every event*, I think you'll want
to store a map of the *latest event by key*. (This would be the key that's
common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable). This
way, you'll automatically overwrite old, obsolete, join results with new
ones for the same key (whether or not the old result happens to be the same
as the new one).

Does this help?
-John

On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <artmro@gmail.com> wrote:

> Hi,
> a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
> KTable joins followed by group by and aggregate on and KStream  and one
> additional join. Now this KTable/KTable join followed by group by  and
> aggregated genereates duplicates.
>
>
>
> I wonder if a possible workaround would be to remove the KStream after
> KTable/KTable join and make groupBy and aggregate  on the KTable?
>
>
>  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> CustomerAndPolicy(customer,policy));
>
>        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> AND_PAYMENT_JOIN_STORE);
>
>
>  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> KAFKA-4609?*
>
>        KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable =
> claimAndPaymentKStream
>                .groupBy((k,claimPay) ->
>                    (claimPay.claimList != null ) ?
>
> Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> claimnumber.split("_")[0])
> :  999,integerSerde,claimAndPaymentSerde )
>                .aggregate(
>                         ClaimAndPayment2::new,
>                         (claimKey,claimPay,claimAndPay2) -> {
>
>
> claimAndPay2.claimAndPaymentList.add(claimPay);
>
>                                 return claimAndPay2;
>
>                         }
>                         ,claimAndPayment2Serde
>                         ,CLAIM_AND_PAYMENT_STORE
>                 );
>
>
>
>
>
> Best regards
> Artur Mrozowski
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message