kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Deduplicating KStream-KStream join
Date Thu, 04 May 2017 22:54:52 GMT
Hi,

we don't believe in triggers ;)
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

-> Thus, it's a BigQuery flaw to not support updates... (IMHO)

(We are also considering improving KStream-KStream join though, but
that's of course no short term solution for you:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions)

The simplest way to work around this within Streams would be to use a
KTable -- but it might not be 100% guaranteed that all duplicates get
filtered (as we flush on commit) -- you get only very high probability:
http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step


However, building a de-duplication by yourself is not so hard basically.
Just use a #transform() with a state. Put all results into the state and
thus, duplicates "overwrite" older (false) results.

The tricky part is only do decide, how long you want to delay your
output and evict result from your state and sent it downstream. As it is
stream processing, there might always be a late arriving record, and
thus, if you decide that at <key, x-null> result should be evicted from
your store and sent to BigQuery, you might still be wrong, and later a
<key, x-y> updates might arrive in your de-duplication filter...

-> this problem is the actual reason, why we don't believe in triggers
-- it's a simplification to assume that you can trigger at some point
and get a final result (but you can never be sure....) -- be think a
cleaner solution is the ability to handle "change" (ie update) end-to-end.

Thus, the "cleanest" solution would actually be to make your downstream
applications aware of "late update". As you can't do anything about
BigQuery, you could let your downstream application deal with it -- ie,
when you query BigQuery, filter those result records with some SQL
magic. Not sure how feasible this approach would be for your use case.
If it does not works, you can still go with the #transform() step if you
can define a good policy for how long to wait for late data.


Hope this helps.


-Matthias




On 5/4/17 8:26 AM, Ofir Sharony wrote:
> Hi guys,
> 
> I want to perform a join between two KStreams.
> An event may appear only on one of the streams (either one of them), so I
> can't use inner join (which emits only on a match) or left join (which
> emits only when the left input arrives).
> This leaves me with outer join. The problem with outer join is that it
> emits on every record arrival, which creates duplicates at the output node.
> 
> My downstream application is BigQuery, which doesn't support updates, thus
> can't do the dedup by itself.
> What is the best practice implementing deduplication in KafkaStreams,
> keeping only the latest, most updated record?
> Is it possible to emit a record only after some time has passed, or upon a
> certain trigger?
> 
> Thanks.
> 
> *Ofir Sharony*
> BackEnd Tech Lead
> 
> Mobile: +972-54-7560277 <+972%2054-756-0277> | ofir.sharony@myheritage.com
> | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> 
> <http://www.myheritage.com/>
> 
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
> 


Mime
View raw message