kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Possible bug? Duplicates when searching kafka stream state store with caching
Date Fri, 06 Jul 2018 21:06:24 GMT
Yes, please create a JIRA reporting this: the `all()` and `fetchAll()`
source code was not modified since its first added into ReadOnlyWindowStore
API, so it's likely a lurking bug caused the issue. And please attach your
code / sample data if possible to help us reproduce this issue in order to
investigate further.



Guozhang


On Fri, Jul 6, 2018 at 7:41 AM, Christian Henry <christian.henry92@gmail.com
> wrote:

> Any other ideas here? Should I create a bug?
>
> On Tue, Jul 3, 2018 at 1:21 PM, Christian Henry <
> christian.henry92@gmail.com
> > wrote:
>
> > Nope, we're setting retainDuplicates to false.
> >
> > On Tue, Jul 3, 2018 at 6:55 AM, Damian Guy <damian.guy@gmail.com> wrote:
> >
> >> Hi,
> >>
> >> When you create your window store do you have `retainDuplicates` set to
> >> `true`? i.e., assuming you use `Stores.persistentWindowStore(...)` is
> the
> >> last param `true`?
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Mon, 2 Jul 2018 at 17:29 Christian Henry <
> christian.henry92@gmail.com>
> >> wrote:
> >>
> >> > We're using the latest Kafka (1.1.0). I'd like to note that when we
> >> > encounter duplicates, the window is the same as well.
> >> >
> >> > My original code was a bit simplifier -- we also insert into the store
> >> if
> >> > iterator.hasNext() as well, before returning null. We're using a
> window
> >> > store because we have a punctuator that runs every few minutes to
> count
> >> > GUIDs with similar metadata, and reports that in a healthcheck. Since
> >> our
> >> > healthcheck window is less than the retention period of the store
> >> > (retention period might be 1 hour, healthcheck window is ~5 min), the
> >> > window store seemed like a good way to efficiently query all of the
> most
> >> > recent data. Note that since the healthcheck punctuator needs to
> >> aggregate
> >> > on all the recent values, it has to do a *fetchAll(start, end) *which
> is
> >> > how these duplicates are affecting us.
> >> >
> >> > On Fri, Jun 29, 2018 at 7:32 PM, Guozhang Wang <wangguoz@gmail.com>
> >> wrote:
> >> >
> >> > > Hello Christian,
> >> > >
> >> > > Since you are calling fetch(key, start, end) I'm assuming that
> >> > > duplicateStore
> >> > > is a WindowedStore. With a windowed store, it is possible that a
> >> single
> >> > key
> >> > > can fall into multiple windows, and hence be returned from the
> >> > > WindowStoreIterator,
> >> > > note its type is <Windowed<K>, V>
> >> > >
> >> > > So I'd first want to know
> >> > >
> >> > > 1) which Kafka version are you using.
> >> > > 2) why you'd need a window store, and if yes, could you consider
> using
> >> > the
> >> > > single point fetch (added in KAFKA-6560) other than the range query
> >> > (which
> >> > > is more expensive as well).
> >> > >
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Fri, Jun 29, 2018 at 11:38 AM, Christian Henry <
> >> > > christian.henry92@gmail.com> wrote:
> >> > >
> >> > > > Hi all,
> >> > > >
> >> > > > I'll first describe a simplified view of relevant parts of our
> setup
> >> > > (which
> >> > > > should be enough to repro), describe the behavior we're seeing,
> and
> >> > then
> >> > > > note some information I've come across after digging in a bit.
> >> > > >
> >> > > > We have a kafka stream application, and one of our transform
steps
> >> > keeps
> >> > > a
> >> > > > state store to filter out messages with a previously seen GUID.
> That
> >> > is,
> >> > > > our transform looks like:
> >> > > >
> >> > > > public KeyValue<byte[], String> transform(byte[] key, String
> guid) {
> >> > > >     try (WindowStoreIterator<DuplicateMessageMetadata>
iterator =
> >> > > > duplicateStore.fetch(correlationId, start, now)) {
> >> > > >         if (iterator.hasNext()) {
> >> > > >             return null;
> >> > > >         } else {
> >> > > >             duplicateStore.put(correlationId, some metadata);
> >> > > >             return new KeyValue<>(key, message);
> >> > > >         }
> >> > > >     }}
> >> > > >
> >> > > > where the duplicateStore is a persistent windowed store with
> caching
> >> > > > enabled.
> >> > > >
> >> > > > I was debugging some tests and found that sometimes when calling
> >> > > > *all()* or *fetchAll()
> >> > > > *on the duplicate store and stepping through the iterator, it
> would
> >> > > return
> >> > > > the same guid more than once, even if it was only inserted into
> the
> >> > store
> >> > > > once. More specifically, if I had the following guids sent to
the
> >> > stream:
> >> > > > [11111, 22222, ... 99999] (for 9 values total), sometimes it
would
> >> > return
> >> > > > 10 values, with one (or more) of the values being returned twice
> by
> >> the
> >> > > > iterator. However, this would not show up with a *fetch(guid)*
on
> >> that
> >> > > > specific guid. For instance, if 11111 was being returned twice
by
> >> > > > *fetchAll()*, calling *duplicateStore.fetch("11111", start, end)*
> >> will
> >> > > > still return an iterator with size of 1.
> >> > > >
> >> > > > I dug into this a bit more by setting a breakpoint in
> >> > > > *SegmentedCacheFunction#compareSegmentedKeys(cacheKey,
> >> > > > storeKey)* and watching the two input values as I looped through
> the
> >> > > > iterator using "*while(iterator.hasNext()) {
> print(iterator.next())
> >> > }*".
> >> > > In
> >> > > > one test, the duplicate value was 66666, and saw the following
> >> behavior
> >> > > > (trimming off the segment values from the byte input):
> >> > > > -- compareSegmentedKeys(cacheKey = 66666, storeKey = 22222)
> >> > > > -- next() returns 66666
> >> > > > and
> >> > > > -- compareSegmentedKeys(cacheKey = 77777, storeKey = 66666)
> >> > > > -- next() returns 66666
> >> > > > Besides those, the input values are the same and the output is
as
> >> > > expected.
> >> > > > Additionally, a coworker noted that the number of duplicates
> always
> >> > > matches
> >> > > > the number of times *Long.compare(cacheSegmentId, storeSegmentId)
> >> > > *returns
> >> > > > a non-zero value, indicating that duplicates are likely arising
> due
> >> to
> >> > > the
> >> > > > segment comparison.
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message