kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: Understanding ReadOnlyWindowStore.fetch
Date Wed, 29 Mar 2017 19:52:20 GMT
So '.until()' is based on clock time / elapsed time (IE record age) /
something else?

The fact that Im seeing lots of records come through that can't be found in
the Store - these are 'old' and already expired?

Going forward - it would be useful to have different forms of '.until()' so
one could consume old records (EG if one was catching up from lag) without
having to worry about them immediately disappearing.

On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian.guy@gmail.com> wrote:

> Jon,
>
> You should be able to query anything that has not expired, i.e., based on
> TimeWindows.until(..).
>
> Thanks,
> Damian
>
> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yeargers@cedexis.com>
> wrote:
>
> > To be a bit more specific:
> >
> > If I call this:         KTable<Window, String> kt =
> > sourceStream.groupByKey().reduce(..., "somekeystore");
> >
> > and then call this:
> >
> > kt.forEach()-> ...
> >
> > Can I assume that everything that comes out will be available in
> > "somekeystore"? If not, what subset should I expect to find there?
> >
> > On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com>
> > wrote:
> >
> > > But if a key shows up in a KTable->forEach should it be available in
> the
> > > StateStore (from the KTable)?
> > >
> > > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io>
> > > wrote:
> > >
> > >> Jon,
> > >>
> > >> there's a related example, using a window store and a key-value store,
> > at
> > >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> > >> streams/src/test/java/io/confluent/examples/streams/Val
> > >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> > >> (this is for Confluent 3.2 / Kafka 0.10.2).
> > >>
> > >> -Michael
> > >>
> > >>
> > >>
> > >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
> jon.yeargers@cedexis.com
> > >
> > >> wrote:
> > >>
> > >> > Im only running one instance (locally) to keep things simple.
> > >> >
> > >> > Reduction:
> > >> >
> > >> >         KTable<Windowed<String>, String> hourAggStore
=
> > >> > sourceStream.groupByKey().reduce(rowReducer,
> > >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> > >> > 1000).until(70 * 60 * 1000L),
> > >> >                 "HourAggStore");
> > >> >
> > >> > then I get values to look for via:
> > >> >
> > >> >         hourAggStore.foreach((k, v) -> {
> > >> >                 LogLine logLine = objectMapper.readValue(v,
> > >> logLine.class);
> > >> >                 LOGGER.debug("{}", k.key());
> > >> >         });
> > >> >
> > >> > Ive kept it easy by requesting everything from 0 to
> > >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from
> your
> > >> > sample code "windowedByKey".
> > >> >
> > >> > Requests are sent in via curl and output through the same channel.
I
> > >> pass
> > >> > in the key and ask for any values.
> > >> >
> > >> > Ive looked at the values passed in / out of the reduction function
> and
> > >> they
> > >> > look sane.
> > >> >
> > >> > My assumption is that if a value shows up in the 'forEach' loop this
> > >> > implies it exists in the StateStore. Accurate?
> > >> >
> > >> > In fact, only about one in 10 requests actually return any values.
> No
> > >> > errors - just no data.
> > >> >
> > >> >
> > >> >
> > >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian.guy@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi Jon,
> > >> > >
> > >> > > If you are able to get a handle on the store, i.e., via
> > >> > > KafkaStreams.store(...) and call fetch without any exceptions,
> then
> > >> the
> > >> > > store is available.
> > >> > > The time params to fetch are the boundaries to search for windows
> > for
> > >> the
> > >> > > given key. They relate to the start time of the window, so if
you
> > did
> > >> > > fetch(key, t1, t2) - it will find all the windows for key that
> start
> > >> in
> > >> > the
> > >> > > inclusive time range t1 - t2.
> > >> > >
> > >> > > Are you running more than one instance? If yes, then you want
to
> > make
> > >> > sure
> > >> > > that you are querying the correct instance. For that you can
use:
> > >> > > KafkaStreams.metadataForKey(...) to find the instance that has
> the
> > >> key
> > >> > you
> > >> > > are looking for.
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
> jon.yeargers@cedexis.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Im probing about trying to find a way to solve my aggregation
->
> > db
> > >> > > issue.
> > >> > > > Looking at the '.fetch()'  function Im wondering about the
> > >> 'timeFrom'
> > >> > and
> > >> > > > 'timeTo' params as not a lot is mentioned about 'proper'
usage.
> > >> > > >
> > >> > > > The test in
> > >> > > >
> > >> > > > https://github.com/confluentinc/examples/blob/
> > >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> > >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> > >> > > mpleTest.java#L200-L212
> > >> > > > makes it appear that the params are boundaries and that
it will
> > >> return
> > >> > an
> > >> > > > inclusive list of every key/window combination. Truth?
> > >> > > >
> > >> > > > My tests to this end haven't returned anything.
> > >> > > >
> > >> > > > Im watching the values coming out of the KTable<Window,
String>
> > so I
> > >> > can
> > >> > > > send them back as request params. What Ive tried:
> > >> > > >
> > >> > > > - Window.key(), Window.key().start() and Window.key().end()
> > >> > > > - Window.key(), (Window.key().start() - 1) and
> (Window.key().end()
> > >> + 1)
> > >> > > > - Window.key(), 0 and Window.key().end()
> > >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> > >> > > >
> > >> > > > None of these seem to hit anything in the StateStore.
> > >> > > >
> > >> > > > Is there a delay before Store values become available for
> > >> '.fetch()'?
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message