kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: Understanding ReadOnlyWindowStore.fetch
Date Wed, 29 Mar 2017 23:48:05 GMT
It's based in "stream time", ie, the internally tracked progress based
on the timestamps return by TimestampExtractor.

-Matthias

On 3/29/17 12:52 PM, Jon Yeargers wrote:
> So '.until()' is based on clock time / elapsed time (IE record age) /
> something else?
> 
> The fact that Im seeing lots of records come through that can't be found in
> the Store - these are 'old' and already expired?
> 
> Going forward - it would be useful to have different forms of '.until()' so
> one could consume old records (EG if one was catching up from lag) without
> having to worry about them immediately disappearing.
> 
> On Wed, Mar 29, 2017 at 10:37 AM, Damian Guy <damian.guy@gmail.com> wrote:
> 
>> Jon,
>>
>> You should be able to query anything that has not expired, i.e., based on
>> TimeWindows.until(..).
>>
>> Thanks,
>> Damian
>>
>> On Wed, 29 Mar 2017 at 17:24 Jon Yeargers <jon.yeargers@cedexis.com>
>> wrote:
>>
>>> To be a bit more specific:
>>>
>>> If I call this:         KTable<Window, String> kt =
>>> sourceStream.groupByKey().reduce(..., "somekeystore");
>>>
>>> and then call this:
>>>
>>> kt.forEach()-> ...
>>>
>>> Can I assume that everything that comes out will be available in
>>> "somekeystore"? If not, what subset should I expect to find there?
>>>
>>> On Wed, Mar 29, 2017 at 8:34 AM, Jon Yeargers <jon.yeargers@cedexis.com>
>>> wrote:
>>>
>>>> But if a key shows up in a KTable->forEach should it be available in
>> the
>>>> StateStore (from the KTable)?
>>>>
>>>> On Wed, Mar 29, 2017 at 6:31 AM, Michael Noll <michael@confluent.io>
>>>> wrote:
>>>>
>>>>> Jon,
>>>>>
>>>>> there's a related example, using a window store and a key-value store,
>>> at
>>>>> https://github.com/confluentinc/examples/blob/3.2.x/kafka-
>>>>> streams/src/test/java/io/confluent/examples/streams/Val
>>>>> idateStateWithInteractiveQueriesLambdaIntegrationTest.java
>>>>> (this is for Confluent 3.2 / Kafka 0.10.2).
>>>>>
>>>>> -Michael
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 29, 2017 at 3:12 PM, Jon Yeargers <
>> jon.yeargers@cedexis.com
>>>>
>>>>> wrote:
>>>>>
>>>>>> Im only running one instance (locally) to keep things simple.
>>>>>>
>>>>>> Reduction:
>>>>>>
>>>>>>         KTable<Windowed<String>, String> hourAggStore
=
>>>>>> sourceStream.groupByKey().reduce(rowReducer,
>>>>>>                 TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60
*
>>>>>> 1000).until(70 * 60 * 1000L),
>>>>>>                 "HourAggStore");
>>>>>>
>>>>>> then I get values to look for via:
>>>>>>
>>>>>>         hourAggStore.foreach((k, v) -> {
>>>>>>                 LogLine logLine = objectMapper.readValue(v,
>>>>> logLine.class);
>>>>>>                 LOGGER.debug("{}", k.key());
>>>>>>         });
>>>>>>
>>>>>> Ive kept it easy by requesting everything from 0 to
>>>>>> 'System.currentTimeMillis()'. Retrieval is done using a snip from
>> your
>>>>>> sample code "windowedByKey".
>>>>>>
>>>>>> Requests are sent in via curl and output through the same channel.
I
>>>>> pass
>>>>>> in the key and ask for any values.
>>>>>>
>>>>>> Ive looked at the values passed in / out of the reduction function
>> and
>>>>> they
>>>>>> look sane.
>>>>>>
>>>>>> My assumption is that if a value shows up in the 'forEach' loop this
>>>>>> implies it exists in the StateStore. Accurate?
>>>>>>
>>>>>> In fact, only about one in 10 requests actually return any values.
>> No
>>>>>> errors - just no data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 29, 2017 at 2:15 AM, Damian Guy <damian.guy@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hi Jon,
>>>>>>>
>>>>>>> If you are able to get a handle on the store, i.e., via
>>>>>>> KafkaStreams.store(...) and call fetch without any exceptions,
>> then
>>>>> the
>>>>>>> store is available.
>>>>>>> The time params to fetch are the boundaries to search for windows
>>> for
>>>>> the
>>>>>>> given key. They relate to the start time of the window, so if
you
>>> did
>>>>>>> fetch(key, t1, t2) - it will find all the windows for key that
>> start
>>>>> in
>>>>>> the
>>>>>>> inclusive time range t1 - t2.
>>>>>>>
>>>>>>> Are you running more than one instance? If yes, then you want
to
>>> make
>>>>>> sure
>>>>>>> that you are querying the correct instance. For that you can
use:
>>>>>>> KafkaStreams.metadataForKey(...) to find the instance that has
>> the
>>>>> key
>>>>>> you
>>>>>>> are looking for.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Damian
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 28 Mar 2017 at 22:37 Jon Yeargers <
>> jon.yeargers@cedexis.com
>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Im probing about trying to find a way to solve my aggregation
->
>>> db
>>>>>>> issue.
>>>>>>>> Looking at the '.fetch()'  function Im wondering about the
>>>>> 'timeFrom'
>>>>>> and
>>>>>>>> 'timeTo' params as not a lot is mentioned about 'proper'
usage.
>>>>>>>>
>>>>>>>> The test in
>>>>>>>>
>>>>>>>> https://github.com/confluentinc/examples/blob/
>>>>>>> master/kafka-streams/src/test/java/io/confluent/examples/
>>>>>>> streams/interactivequeries/WordCountInteractiveQueriesExa
>>>>>>> mpleTest.java#L200-L212
>>>>>>>> makes it appear that the params are boundaries and that it
will
>>>>> return
>>>>>> an
>>>>>>>> inclusive list of every key/window combination. Truth?
>>>>>>>>
>>>>>>>> My tests to this end haven't returned anything.
>>>>>>>>
>>>>>>>> Im watching the values coming out of the KTable<Window,
String>
>>> so I
>>>>>> can
>>>>>>>> send them back as request params. What Ive tried:
>>>>>>>>
>>>>>>>> - Window.key(), Window.key().start() and Window.key().end()
>>>>>>>> - Window.key(), (Window.key().start() - 1) and
>> (Window.key().end()
>>>>> + 1)
>>>>>>>> - Window.key(), 0 and Window.key().end()
>>>>>>>> - Window.key(), 0 and (Window.key().end() + 1)
>>>>>>>>
>>>>>>>> None of these seem to hit anything in the StateStore.
>>>>>>>>
>>>>>>>> Is there a delay before Store values become available for
>>>>> '.fetch()'?
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
> 


Mime
View raw message