kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: accessing state-store ala WordCount example
Date Wed, 07 Dec 2016 11:33:58 GMT
Hi Jon,

The "/windowed" namel in the web server example is just an example name, it could have been
called differently too. It is built however on the Interactive Query APIs which are fixed.
In the example code I mentioned we see the implementation as shown below. Again, the web server
code is just a reference implementation that you can feel free to change or call differently
(as long as you call store.fetch etc correctly). 

  public List<KeyValueBean> windowedByKey(@PathParam("storeName") final String storeName,
                                          @PathParam("key") final String key,
                                          @PathParam("from") final Long from,
                                          @PathParam("to") final Long to) {

    // Lookup the WindowStore with the provided storeName
    final ReadOnlyWindowStore<String, Long> store = streams.store(storeName,
    if (store == null) {
      throw new NotFoundException();

    // fetch the window results for the given key and time range
    final WindowStoreIterator<Long> results = store.fetch(key, from, to);

    final List<KeyValueBean> windowResults = new ArrayList<>();
    while (results.hasNext()) {
      final KeyValue<Long, Long> next = results.next();
      // convert the result to have the window time and the key (for display purposes)
      windowResults.add(new KeyValueBean(key + "@" + next.key, next.value));
    return windowResults;

> On 7 Dec 2016, at 11:13, Jon Yeargers <jon.yeargers@cedexis.com> wrote:
> Im having trouble finding documentation on this new feature. Can you point
> me to anything?
> Specifically on how to get available "from/to" values but more generally on
> how to use the "windowed" query.
> On Wed, Dec 7, 2016 at 1:25 AM, Eno Thereska <eno.thereska@gmail.com> wrote:
>> Hi Jon,
>> This will be a windowed store. Have a look at the Jetty-server bits for
>> windowedByKey:
>> "/windowed/{storeName}/{key}/{from}/{to}"
>> Thanks
>> Eno
>>> On 6 Dec 2016, at 23:33, Jon Yeargers <jon.yeargers@cedexis.com> wrote:
>>> I copied out some of the WordCountInteractive
>>> <https://github.com/confluentinc/examples/blob/3.
>> 1.x/kafka-streams/src/main/java/io/confluent/examples/
>> streams/interactivequeries/WordCountInteractiveQueriesExample.java>
>>> demo
>>> code to see how the REST access works. I have an aggregator
>>> groupByKey().aggregate(LogLine::new,
>>>   new aggregate(),
>>>   TimeWindows.of(60 * 60 * 1000L),
>>>   collectorSerde, "agg_stream");
>>> I incorporated the Jetty-server bits from the sample. When I run it I can
>>> see results via the '/states/instances' entry point but nothing from the
>>> '/states/keyvalues/agg_stream/all'.
>>> The aggregator is churning away so I'd assume the state store would have
>>> plenty of key/value pairs but it comes up empty.
>>> What's the proper way to use this?

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message