kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: Understanding ReadOnlyWindowStore.fetch
Date Wed, 29 Mar 2017 23:53:04 GMT
I remain more than mystified about the workings of the StateStore. I tried
making aggregations with a 1minute window, 10 second advance and a _12
hour_ retention (which is longer than the retention.ms of the topic).  I
still couldn't get more than a 15% hit rate on the StateStore.

Are there configuration settings? Some properties file to setup RocksDB? Im
not getting any errors - just not getting any data.

On Wed, Mar 29, 2017 at 12:52 PM, Jon Yeargers <jon.yeargers@cedexis.com>
wrote:

> So '.until()' is based on clock time / elapsed time (IE record age) /
> something else?
>
> The fact that Im seeing lots of records come through that can't be found
> in the Store - these are 'old' and already expired?
>
> Going forward - it would be useful to have different forms of '.until()'
> so one could consume old records (EG if one was catching up from lag)
> without having to worry about them immediately disappearing.
>
> On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Jon,
>>
>> You should be able to query anything that has not expired, i.e., based on
>> TimeWindows.until(..).
>>
>> Thanks,
>> Damian
>>
>> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yeargers@cedexis.com>
>> wrote:
>>
>> > To be a bit more specific:
>> >
>> > If I call this:         KTable<Window, String> kt =
>> > sourceStream.groupByKey().reduce(..., "somekeystore");
>> >
>> > and then call this:
>> >
>> > kt.forEach()-> ...
>> >
>> > Can I assume that everything that comes out will be available in
>> > "somekeystore"? If not, what subset should I expect to find there?
>> >
>> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com
>> >
>> > wrote:
>> >
>> > > But if a key shows up in a KTable->forEach should it be available in
>> the
>> > > StateStore (from the KTable)?
>> > >
>> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io>
>> > > wrote:
>> > >
>> > >> Jon,
>> > >>
>> > >> there's a related example, using a window store and a key-value
>> store,
>> > at
>> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> > >> streams/src/test/java/io/confluent/examples/streams/Val
>> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
>> > >>
>> > >> -Michael
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
>> jon.yeargers@cedexis.com
>> > >
>> > >> wrote:
>> > >>
>> > >> > Im only running one instance (locally) to keep things simple.
>> > >> >
>> > >> > Reduction:
>> > >> >
>> > >> >         KTable<Windowed<String>, String> hourAggStore
=
>> > >> > sourceStream.groupByKey().reduce(rowReducer,
>> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 *
60 *
>> > >> > 1000).until(70 * 60 * 1000L),
>> > >> >                 "HourAggStore");
>> > >> >
>> > >> > then I get values to look for via:
>> > >> >
>> > >> >         hourAggStore.foreach((k, v) -> {
>> > >> >                 LogLine logLine = objectMapper.readValue(v,
>> > >> logLine.class);
>> > >> >                 LOGGER.debug("{}", k.key());
>> > >> >         });
>> > >> >
>> > >> > Ive kept it easy by requesting everything from 0 to
>> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
>> your
>> > >> > sample code "windowedByKey".
>> > >> >
>> > >> > Requests are sent in via curl and output through the same channel.
>> I
>> > >> pass
>> > >> > in the key and ask for any values.
>> > >> >
>> > >> > Ive looked at the values passed in / out of the reduction function
>> and
>> > >> they
>> > >> > look sane.
>> > >> >
>> > >> > My assumption is that if a value shows up in the 'forEach' loop
>> this
>> > >> > implies it exists in the StateStore. Accurate?
>> > >> >
>> > >> > In fact, only about one in 10 requests actually return any values.
>> No
>> > >> > errors - just no data.
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian.guy@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > > Hi Jon,
>> > >> > >
>> > >> > > If you are able to get a handle on the store, i.e., via
>> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
>> then
>> > >> the
>> > >> > > store is available.
>> > >> > > The time params to fetch are the boundaries to search for
windows
>> > for
>> > >> the
>> > >> > > given key. They relate to the start time of the window, so
if you
>> > did
>> > >> > > fetch(key, t1, t2) - it will find all the windows for key
that
>> start
>> > >> in
>> > >> > the
>> > >> > > inclusive time range t1 - t2.
>> > >> > >
>> > >> > > Are you running more than one instance? If yes, then you
want to
>> > make
>> > >> > sure
>> > >> > > that you are querying the correct instance. For that you
can use:
>> > >> > > KafkaStreams.metadataForKey(...) to find the instance that
has
>> the
>> > >> key
>> > >> > you
>> > >> > > are looking for.
>> > >> > >
>> > >> > > Thanks,
>> > >> > > Damian
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
>> jon.yeargers@cedexis.com
>> > >
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Im probing about trying to find a way to solve my aggregation
>> ->
>> > db
>> > >> > > issue.
>> > >> > > > Looking at the '.fetch()'  function Im wondering about
the
>> > >> 'timeFrom'
>> > >> > and
>> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper'
usage.
>> > >> > > >
>> > >> > > > The test in
>> > >> > > >
>> > >> > > > https://github.com/confluentinc/examples/blob/
>> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
>> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
>> > >> > > mpleTest.java#L200-L212
>> > >> > > > makes it appear that the params are boundaries and that
it will
>> > >> return
>> > >> > an
>> > >> > > > inclusive list of every key/window combination. Truth?
>> > >> > > >
>> > >> > > > My tests to this end haven't returned anything.
>> > >> > > >
>> > >> > > > Im watching the values coming out of the KTable<Window,
String>
>> > so I
>> > >> > can
>> > >> > > > send them back as request params. What Ive tried:
>> > >> > > >
>> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
>> > >> > > > - Window.key(), (Window.key().start() - 1) and
>> (Window.key().end()
>> > >> + 1)
>> > >> > > > - Window.key(), 0 and Window.key().end()
>> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
>> > >> > > >
>> > >> > > > None of these seem to hit anything in the StateStore.
>> > >> > > >
>> > >> > > > Is there a delay before Store values become available
for
>> > >> '.fetch()'?
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message