kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Damian Guy <damian....@gmail.com>
Subject Re: Understanding ReadOnlyWindowStore.fetch
Date Wed, 29 Mar 2017 17:37:33 GMT
Jon,

You should be able to query anything that has not expired, i.e., based on
TimeWindows.until(..).

Thanks,
Damian

On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yeargers@cedexis.com> wrote:

> To be a bit more specific:
>
> If I call this:         KTable<Window, String> kt =
> sourceStream.groupByKey().reduce(..., "somekeystore");
>
> and then call this:
>
> kt.forEach()-> ...
>
> Can I assume that everything that comes out will be available in
> "somekeystore"? If not, what subset should I expect to find there?
>
> On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com>
> wrote:
>
> > But if a key shows up in a KTable->forEach should it be available in the
> > StateStore (from the KTable)?
> >
> > On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io>
> > wrote:
> >
> >> Jon,
> >>
> >> there's a related example, using a window store and a key-value store,
> at
> >> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
> >> streams/src/test/java/io/confluent/examples/streams/Val
> >> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
> >> (this is for Confluent 3.2 / Kafka 0.10.2).
> >>
> >> -Michael
> >>
> >>
> >>
> >> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> wrote:
> >>
> >> > Im only running one instance (locally) to keep things simple.
> >> >
> >> > Reduction:
> >> >
> >> >         KTable<Windowed<String>, String> hourAggStore =
> >> > sourceStream.groupByKey().reduce(rowReducer,
> >> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
> >> > 1000).until(70 * 60 * 1000L),
> >> >                 "HourAggStore");
> >> >
> >> > then I get values to look for via:
> >> >
> >> >         hourAggStore.foreach((k, v) -> {
> >> >                 LogLine logLine = objectMapper.readValue(v,
> >> logLine.class);
> >> >                 LOGGER.debug("{}", k.key());
> >> >         });
> >> >
> >> > Ive kept it easy by requesting everything from 0 to
> >> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
> >> > sample code "windowedByKey".
> >> >
> >> > Requests are sent in via curl and output through the same channel. I
> >> pass
> >> > in the key and ask for any values.
> >> >
> >> > Ive looked at the values passed in / out of the reduction function and
> >> they
> >> > look sane.
> >> >
> >> > My assumption is that if a value shows up in the 'forEach' loop this
> >> > implies it exists in the StateStore. Accurate?
> >> >
> >> > In fact, only about one in 10 requests actually return any values. No
> >> > errors - just no data.
> >> >
> >> >
> >> >
> >> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian.guy@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Jon,
> >> > >
> >> > > If you are able to get a handle on the store, i.e., via
> >> > > KafkaStreams.store(...) and call fetch without any exceptions, then
> >> the
> >> > > store is available.
> >> > > The time params to fetch are the boundaries to search for windows
> for
> >> the
> >> > > given key. They relate to the start time of the window, so if you
> did
> >> > > fetch(key, t1, t2) - it will find all the windows for key that start
> >> in
> >> > the
> >> > > inclusive time range t1 - t2.
> >> > >
> >> > > Are you running more than one instance? If yes, then you want to
> make
> >> > sure
> >> > > that you are querying the correct instance. For that you can use:
> >> > > KafkaStreams.metadataForKey(...) to find the instance that has the
> >> key
> >> > you
> >> > > are looking for.
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > >
> >> > >
> >> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yeargers@cedexis.com
> >
> >> > > wrote:
> >> > >
> >> > > > Im probing about trying to find a way to solve my aggregation
->
> db
> >> > > issue.
> >> > > > Looking at the '.fetch()'  function Im wondering about the
> >> 'timeFrom'
> >> > and
> >> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
> >> > > >
> >> > > > The test in
> >> > > >
> >> > > > https://github.com/confluentinc/examples/blob/
> >> > > master/kafka-streams/src/test/java/io/confluent/examples/
> >> > > streams/interactivequeries/WordCountInteractiveQueriesExa
> >> > > mpleTest.java#L200-L212
> >> > > > makes it appear that the params are boundaries and that it will
> >> return
> >> > an
> >> > > > inclusive list of every key/window combination. Truth?
> >> > > >
> >> > > > My tests to this end haven't returned anything.
> >> > > >
> >> > > > Im watching the values coming out of the KTable<Window, String>
> so I
> >> > can
> >> > > > send them back as request params. What Ive tried:
> >> > > >
> >> > > > - Window.key(), Window.key().start() and Window.key().end()
> >> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end()
> >> + 1)
> >> > > > - Window.key(), 0 and Window.key().end()
> >> > > > - Window.key(), 0 and (Window.key().end() + 1)
> >> > > >
> >> > > > None of these seem to hit anything in the StateStore.
> >> > > >
> >> > > > Is there a delay before Store values become available for
> >> '.fetch()'?
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message