kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jon Yeargers <jon.yearg...@cedexis.com>
Subject Re: Understanding ReadOnlyWindowStore.fetch
Date Wed, 29 Mar 2017 16:24:23 GMT
To be a bit more specific:

If I call this:         KTable<Window, String> kt =
sourceStream.groupByKey().reduce(..., "somekeystore");

and then call this:

kt.forEach()-> ...

Can I assume that everything that comes out will be available in
"somekeystore"? If not, what subset should I expect to find there?

On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com>
wrote:

> But if a key shows up in a KTable->forEach should it be available in the
> StateStore (from the KTable)?
>
> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io>
> wrote:
>
>> Jon,
>>
>> there's a related example, using a window store and a key-value store, at
>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>> streams/src/test/java/io/confluent/examples/streams/Val
>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>> (this is for Confluent 3.2 / Kafka 0.10.2).
>>
>> -Michael
>>
>>
>>
>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <jon.yeargers@cedexis.com>
>> wrote:
>>
>> > Im only running one instance (locally) to keep things simple.
>> >
>> > Reduction:
>> >
>> >         KTable<Windowed<String>, String> hourAggStore =
>> > sourceStream.groupByKey().reduce(rowReducer,
>> >                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
>> > 1000).until(70 * 60 * 1000L),
>> >                 "HourAggStore");
>> >
>> > then I get values to look for via:
>> >
>> >         hourAggStore.foreach((k, v) -> {
>> >                 LogLine logLine = objectMapper.readValue(v,
>> logLine.class);
>> >                 LOGGER.debug("{}", k.key());
>> >         });
>> >
>> > Ive kept it easy by requesting everything from 0 to
>> > 'System.currentTimeMillis()'. Retrieval is done using a snip from your
>> > sample code "windowedByKey".
>> >
>> > Requests are sent in via curl and output through the same channel. I
>> pass
>> > in the key and ask for any values.
>> >
>> > Ive looked at the values passed in / out of the reduction function and
>> they
>> > look sane.
>> >
>> > My assumption is that if a value shows up in the 'forEach' loop this
>> > implies it exists in the StateStore. Accurate?
>> >
>> > In fact, only about one in 10 requests actually return any values. No
>> > errors - just no data.
>> >
>> >
>> >
>> > On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian.guy@gmail.com>
>> wrote:
>> >
>> > > Hi Jon,
>> > >
>> > > If you are able to get a handle on the store, i.e., via
>> > > KafkaStreams.store(...) and call fetch without any exceptions, then
>> the
>> > > store is available.
>> > > The time params to fetch are the boundaries to search for windows for
>> the
>> > > given key. They relate to the start time of the window, so if you did
>> > > fetch(key, t1, t2) - it will find all the windows for key that start
>> in
>> > the
>> > > inclusive time range t1 - t2.
>> > >
>> > > Are you running more than one instance? If yes, then you want to make
>> > sure
>> > > that you are querying the correct instance. For that you can use:
>> > > KafkaStreams.metadataForKey(...) to find the instance that has the
>> key
>> > you
>> > > are looking for.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > >
>> > >
>> > > On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <jon.yeargers@cedexis.com>
>> > > wrote:
>> > >
>> > > > Im probing about trying to find a way to solve my aggregation ->
db
>> > > issue.
>> > > > Looking at the '.fetch()'  function Im wondering about the
>> 'timeFrom'
>> > and
>> > > > 'timeTo' params as not a lot is mentioned about 'proper' usage.
>> > > >
>> > > > The test in
>> > > >
>> > > > https://github.com/confluentinc/examples/blob/
>> > > master/kafka-streams/src/test/java/io/confluent/examples/
>> > > streams/interactivequeries/WordCountInteractiveQueriesExa
>> > > mpleTest.java#L200-L212
>> > > > makes it appear that the params are boundaries and that it will
>> return
>> > an
>> > > > inclusive list of every key/window combination. Truth?
>> > > >
>> > > > My tests to this end haven't returned anything.
>> > > >
>> > > > Im watching the values coming out of the KTable<Window, String>
so I
>> > can
>> > > > send them back as request params. What Ive tried:
>> > > >
>> > > > - Window.key(), Window.key().start() and Window.key().end()
>> > > > - Window.key(), (Window.key().start() - 1) and (Window.key().end()
>> + 1)
>> > > > - Window.key(), 0 and Window.key().end()
>> > > > - Window.key(), 0 and (Window.key().end() + 1)
>> > > >
>> > > > None of these seem to hit anything in the StateStore.
>> > > >
>> > > > Is there a delay before Store values become available for
>> '.fetch()'?
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message