kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian McCague <Adrian.McCa...@zopa.com>
Subject RE: Deduplicating KStream-KStream join
Date Fri, 05 May 2017 13:54:03 GMT
Hi Matthias

We have been thinking about this problem recently and thought, wouldn't it be nice if a join
could be configured to be '1 time', within the retention period of the join window. So if
a join has occurred already on a particular key, further ones will be ignored for the remainder
of the retention period (or maybe the retention is reset). Assuming I have not missed a feature
on the current DSL.

I suppose the implementation would not be much different from putting a transform after the
join as a second state store would be required?

Adrian
 
-----Original Message-----
From: Matthias J. Sax [mailto:matthias@confluent.io] 
Sent: 04 May 2017 23:55
To: users@kafka.apache.org
Subject: Re: Deduplicating KStream-KStream join

Hi,

we don't believe in triggers ;)
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

-> Thus, it's a BigQuery flaw to not support updates... (IMHO)

(We are also considering improving KStream-KStream join though, but that's of course no short
term solution for you:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions)

The simplest way to work around this within Streams would be to use a KTable -- but it might
not be 100% guaranteed that all duplicates get filtered (as we flush on commit) -- you get
only very high probability:
http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step


However, building a de-duplication by yourself is not so hard basically.
Just use a #transform() with a state. Put all results into the state and thus, duplicates
"overwrite" older (false) results.

The tricky part is only do decide, how long you want to delay your output and evict result
from your state and sent it downstream. As it is stream processing, there might always be
a late arriving record, and thus, if you decide that at <key, x-null> result should
be evicted from your store and sent to BigQuery, you might still be wrong, and later a <key,
x-y> updates might arrive in your de-duplication filter...

-> this problem is the actual reason, why we don't believe in triggers
-- it's a simplification to assume that you can trigger at some point and get a final result
(but you can never be sure....) -- be think a cleaner solution is the ability to handle "change"
(ie update) end-to-end.

Thus, the "cleanest" solution would actually be to make your downstream applications aware
of "late update". As you can't do anything about BigQuery, you could let your downstream application
deal with it -- ie, when you query BigQuery, filter those result records with some SQL magic.
Not sure how feasible this approach would be for your use case.
If it does not works, you can still go with the #transform() step if you can define a good
policy for how long to wait for late data.


Hope this helps.


-Matthias




On 5/4/17 8:26 AM, Ofir Sharony wrote:
> Hi guys,
> 
> I want to perform a join between two KStreams.
> An event may appear only on one of the streams (either one of them), 
> so I can't use inner join (which emits only on a match) or left join 
> (which emits only when the left input arrives).
> This leaves me with outer join. The problem with outer join is that it 
> emits on every record arrival, which creates duplicates at the output node.
> 
> My downstream application is BigQuery, which doesn't support updates, 
> thus can't do the dedup by itself.
> What is the best practice implementing deduplication in KafkaStreams, 
> keeping only the latest, most updated record?
> Is it possible to emit a record only after some time has passed, or 
> upon a certain trigger?
> 
> Thanks.
> 
> *Ofir Sharony*
> BackEnd Tech Lead
> 
> Mobile: +972-54-7560277 <+972%2054-756-0277> | 
> ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> 
> <http://www.myheritage.com/>
> 
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
> 

Mime
View raw message