kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Deduplicating KStream-KStream join
Date Mon, 08 May 2017 22:08:53 GMT
Offir,

Yes, KTable would not be 100% reliable:

> The simplest way to work around this within Streams would be to use a
> KTable -- but it might not be 100% guaranteed that all duplicates get
> filtered (as we flush on commit)

Your suggestion (1) to add a TTL is unfortunately not easily to
implements atm. The problem is, that on commit, Streams need to flush
all stores/caches to guarantee at-least-once semantics. If we would not
flush on commit, we would need quite complex logic to track which keys
did get flushed and which not. Not saying it's impossible, but it does
not follow the overall architecture of Streams atm.

About (2):

> Are these ever clean themselves
>    up, or live forever, thus grow indefinitely?

For non-windowed KTable this is no issues as long as the key space is
bounded -- update just overwrite older key-value pairs and we apply
log-compaction to the changelog topic.

For windowed KTables, we apply an additional retention time to both
RockDB store and the changelog topic. As the keys are composite keys
with (original-key,window-start-timestamp) the key space is unbounded.
The retention time ensure, that old windows that are not update to the
retention period are eventually deleted. The default retention time is 1
day and you can set it on a per-store basis using Windows#until() method
when you define the size of you join or aggregation window.

> In my scenario, the KStream result of my join represents request logs,
>    which are not really suited to be seen as a KTable, as every request is
>    unique.

Thinking about this twice, you comment makes sense and KTable is no
solution for you problem.


I guess, the only way to filter out "false" <key, x-null> records would
be a custom transform() with a store that holds "null" records back
until a real result is received, or if some delay passed and you assume
that there will not be any "real join result".

We hope to improve join semantics if future releases. For now, I guess
you need to apply this workaround.


-Matthias





On 5/6/17 9:46 PM, Ofir Sharony wrote:
> Max,
> Thanks for your detailed answer. Couple of comments/questions:
> 
>    1. When performing caching of a KTable in order to reduce the amount of
>    duplicates, as you mentioned, it doesn't provide 100% solution. From the
>    docs:
>    *"The semantics of caching is that data is flushed to the state store
>    and forwarded to the next downstream processor node whenever the earliest
>    of commit.interval.ms <http://commit.interval.ms> or
>    cache.max.bytes.buffering (cache pressure) hits".*
>    I want to suggest an implementation improvement regarding the time
>    threshold. When using cache, I would have liked to see a behavior that
>    flushes records a certain time after they were cached (like a cache-TTL).
>    In current implementation, events arriving very close to the next
>    *commit.interval* will be flushed very quickly, thus subsequent events
>    with the same key will necessarily create duplicates downstream.
> 
>    2. In regards to converting the resulting KStream to a KTable - If I
>    understand correctly, this will result with an ever updating state store
>    (and same for its backing changelog topic). Are these ever clean themselves
>    up, or live forever, thus grow indefinitely?
>    In my scenario, the KStream result of my join represents request logs,
>    which are not really suited to be seen as a KTable, as every request is
>    unique.
> 
> Thanks,
> Ofir.
> 
> *Ofir Sharony*
> BackEnd Tech Lead
> 
> Mobile: +972-54-7560277 | ofir.sharony@myheritage.com | www.myheritage.com
> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
> 
> <http://www.myheritage.com/>
> 
> <https://www.facebook.com/myheritage>
> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>     <https://www.youtube.com/user/MyHeritageLtd>
> 
> 
> On Fri, May 5, 2017 at 8:15 PM, Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> Not sure if I can follow (and what type of join you have in mind).
>>
>> I assume we are still talking about windowed KStream-KStream joins.
>>
>> For this scenario, and an inner join, the computation is correct as
>> implements. Only left/outer join have a single flaw that they might
>> compute a "false" result as it might emit <key: x-null> and <key: x-y>
>> (ie, <key: x-null>) does not align with what you would expect thinking
>> in terms of SQL.
>>
>> However, SQL is based on batch processing thus semantics cannot be
>> applied 1-to-1 to stream processing.
>>
>> Furthermore, a single record might join multiple times (also for inner
>> joins) within a window. Thus, I am not sure what you mean by '1 time'
>> configuration within retention period?
>>
>> Eager to get more details about this idea -- I like joins a lot and what
>> to make them better out-of-the-box -- I am not happy about the current
>> "false" left/outer join results but did not have a good idea about how
>> to resolve it. Just delaying does only mitigate the problem but not
>> resolve it completely.
>>
>>
>> -Matthias
>>
>>
>> On 5/5/17 6:54 AM, Adrian McCague wrote:
>>> Hi Matthias
>>>
>>> We have been thinking about this problem recently and thought, wouldn't
>> it be nice if a join could be configured to be '1 time', within the
>> retention period of the join window. So if a join has occurred already on a
>> particular key, further ones will be ignored for the remainder of the
>> retention period (or maybe the retention is reset). Assuming I have not
>> missed a feature on the current DSL.
>>>
>>> I suppose the implementation would not be much different from putting a
>> transform after the join as a second state store would be required?
>>>
>>> Adrian
>>>
>>> -----Original Message-----
>>> From: Matthias J. Sax [mailto:matthias@confluent.io]
>>> Sent: 04 May 2017 23:55
>>> To: users@kafka.apache.org
>>> Subject: Re: Deduplicating KStream-KStream join
>>>
>>> Hi,
>>>
>>> we don't believe in triggers ;)
>>> https://www.confluent.io/blog/watermarks-tables-event-time-
>> dataflow-model/
>>>
>>> -> Thus, it's a BigQuery flaw to not support updates... (IMHO)
>>>
>>> (We are also considering improving KStream-KStream join though, but
>> that's of course no short term solution for you:
>>> https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Streams+Discussions)
>>>
>>> The simplest way to work around this within Streams would be to use a
>> KTable -- but it might not be 100% guaranteed that all duplicates get
>> filtered (as we flush on commit) -- you get only very high probability:
>>> http://docs.confluent.io/current/streams/faq.html#how-
>> can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
>>>
>>>
>>> However, building a de-duplication by yourself is not so hard basically.
>>> Just use a #transform() with a state. Put all results into the state and
>> thus, duplicates "overwrite" older (false) results.
>>>
>>> The tricky part is only do decide, how long you want to delay your
>> output and evict result from your state and sent it downstream. As it is
>> stream processing, there might always be a late arriving record, and thus,
>> if you decide that at <key, x-null> result should be evicted from your
>> store and sent to BigQuery, you might still be wrong, and later a <key,
>> x-y> updates might arrive in your de-duplication filter...
>>>
>>> -> this problem is the actual reason, why we don't believe in triggers
>>> -- it's a simplification to assume that you can trigger at some point
>> and get a final result (but you can never be sure....) -- be think a
>> cleaner solution is the ability to handle "change" (ie update) end-to-end.
>>>
>>> Thus, the "cleanest" solution would actually be to make your downstream
>> applications aware of "late update". As you can't do anything about
>> BigQuery, you could let your downstream application deal with it -- ie,
>> when you query BigQuery, filter those result records with some SQL magic.
>> Not sure how feasible this approach would be for your use case.
>>> If it does not works, you can still go with the #transform() step if you
>> can define a good policy for how long to wait for late data.
>>>
>>>
>>> Hope this helps.
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>>
>>> On 5/4/17 8:26 AM, Ofir Sharony wrote:
>>>> Hi guys,
>>>>
>>>> I want to perform a join between two KStreams.
>>>> An event may appear only on one of the streams (either one of them),
>>>> so I can't use inner join (which emits only on a match) or left join
>>>> (which emits only when the left input arrives).
>>>> This leaves me with outer join. The problem with outer join is that it
>>>> emits on every record arrival, which creates duplicates at the output
>> node.
>>>>
>>>> My downstream application is BigQuery, which doesn't support updates,
>>>> thus can't do the dedup by itself.
>>>> What is the best practice implementing deduplication in KafkaStreams,
>>>> keeping only the latest, most updated record?
>>>> Is it possible to emit a record only after some time has passed, or
>>>> upon a certain trigger?
>>>>
>>>> Thanks.
>>>>
>>>> *Ofir Sharony*
>>>> BackEnd Tech Lead
>>>>
>>>> Mobile: +972-54-7560277 <+972%2054-756-0277> |
>>>> ofir.sharony@myheritage.com
>>>> | www.myheritage.com
>>>> MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel
>>>>
>>>> <http://www.myheritage.com/>
>>>>
>>>> <https://www.facebook.com/myheritage>
>>>> <https://twitter.com/myheritage>         <http://blog.myheritage.com/>
>>>>     <https://www.youtube.com/user/MyHeritageLtd>
>>>>
>>>
>>
>>
> 


Mime
View raw message