kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Deduplicating KStream-KStream join
Date Fri, 05 May 2017 17:15:41 GMT
Not sure if I can follow (and what type of join you have in mind).

I assume we are still talking about windowed KStream-KStream joins.

For this scenario, and an inner join, the computation is correct as
implements. Only left/outer join have a single flaw that they might
compute a "false" result as it might emit <key: x-null> and <key: x-y>
(ie, <key: x-null>) does not align with what you would expect thinking
in terms of SQL.

However, SQL is based on batch processing thus semantics cannot be
applied 1-to-1 to stream processing.

Furthermore, a single record might join multiple times (also for inner
joins) within a window. Thus, I am not sure what you mean by '1 time'
configuration within retention period?

Eager to get more details about this idea -- I like joins a lot and what
to make them better out-of-the-box -- I am not happy about the current
"false" left/outer join results but did not have a good idea about how
to resolve it. Just delaying does only mitigate the problem but not
resolve it completely.


On 5/5/17 6:54 AM, Adrian McCague wrote:
> Hi Matthias
> We have been thinking about this problem recently and thought, wouldn't it be nice if
a join could be configured to be '1 time', within the retention period of the join window.
So if a join has occurred already on a particular key, further ones will be ignored for the
remainder of the retention period (or maybe the retention is reset). Assuming I have not missed
a feature on the current DSL.
> I suppose the implementation would not be much different from putting a transform after
the join as a second state store would be required?
> Adrian
> -----Original Message-----
> From: Matthias J. Sax [mailto:matthias@confluent.io] 
> Sent: 04 May 2017 23:55
> To: users@kafka.apache.org
> Subject: Re: Deduplicating KStream-KStream join
> Hi,
> we don't believe in triggers ;)
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
> -> Thus, it's a BigQuery flaw to not support updates... (IMHO)
> (We are also considering improving KStream-KStream join though, but that's of course
no short term solution for you:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Discussions)
> The simplest way to work around this within Streams would be to use a KTable -- but it
might not be 100% guaranteed that all duplicates get filtered (as we flush on commit) -- you
get only very high probability:
> http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
> However, building a de-duplication by yourself is not so hard basically.
> Just use a #transform() with a state. Put all results into the state and thus, duplicates
"overwrite" older (false) results.
> The tricky part is only do decide, how long you want to delay your output and evict result
from your state and sent it downstream. As it is stream processing, there might always be
a late arriving record, and thus, if you decide that at <key, x-null> result should
be evicted from your store and sent to BigQuery, you might still be wrong, and later a <key,
x-y> updates might arrive in your de-duplication filter...
> -> this problem is the actual reason, why we don't believe in triggers
> -- it's a simplification to assume that you can trigger at some point and get a final
result (but you can never be sure....) -- be think a cleaner solution is the ability to handle
"change" (ie update) end-to-end.
> Thus, the "cleanest" solution would actually be to make your downstream applications
aware of "late update". As you can't do anything about BigQuery, you could let your downstream
application deal with it -- ie, when you query BigQuery, filter those result records with
some SQL magic. Not sure how feasible this approach would be for your use case.
> If it does not works, you can still go with the #transform() step if you can define a
good policy for how long to wait for late data.
> Hope this helps.
> -Matthias
> On 5/4/17 8:26 AM, Ofir Sharony wrote:
>> Hi guys,
>> I want to perform a join between two KStreams.
>> An event may appear only on one of the streams (either one of them), 
>> so I can't use inner join (which emits only on a match) or left join 
>> (which emits only when the left input arrives).
>> This leaves me with outer join. The problem with outer join is that it 
>> emits on every record arrival, which creates duplicates at the output node.
>> My downstream application is BigQuery, which doesn't support updates, 
>> thus can't do the dedup by itself.
>> What is the best practice implementing deduplication in KafkaStreams, 
>> keeping only the latest, most updated record?
>> Is it possible to emit a record only after some time has passed, or 
>> upon a certain trigger?
>> Thanks.
>> *Ofir Sharony*
>> BackEnd Tech Lead
>> Mobile: +972-54-7560277 <+972%2054-756-0277> | 
>> ofir.sharony@myheritage.com
>> | www.myheritage.com
>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>> <http://www.myheritage.com/>
>> <https://www.facebook.com/myheritage>
>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>     <https://www.youtube.com/user/MyHeritageLtd>

View raw message