kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Francesco Frontera <francesco.front...@radicalbit.io>
Subject Re: KStream and reduce
Date Tue, 11 Sep 2018 12:10:13 GMT
Hi Toni,

it looks like that the local Kafka Streams instance is not yet read, for
this reason, local state store can't be queried yet.
you could try to use the methods explained in Kafka Streams documentation
<https://docs.confluent.io/current/streams/faq.html#id28> for avoiding this
problem:

// Example: Wait until the store of type T is queryable.  When it is,
return a reference to the store.public static <T> T
waitUntilStoreIsQueryable(final String storeName,
                                              final
QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams
streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }}

I hope I've been of some help.

Best regards,
*Francesco Frontera*

*Software Engineer @ Radicalbit*
*Via Borsieri 41, 20159, Milano - IT*


Il giorno mar 11 set 2018 alle ore 13:49 Toni Zehnder
<toni.zehnder@namics.com.invalid> ha scritto:

> Hi guys,
> I try to get more familar with Kafka. So I made a small spring boot
> application.
> I have a simple Kafka with topic "stock" where I put a productId as key and
> an Integer as stock change into it. e.g. {10001, -5};{10001, 2}.
> Now I want to create a materialized view to get the current stock level of
> a specific product:
>
> @Bean
> public KafkaStreams stockStreams() {
>     StreamsConfig config = stockStreamsConfig();
>     StreamsBuilder streamsBuilder = new StreamsBuilder();
>
>     KStream<Long, Integer> stockStream =
> streamsBuilder.stream(STOCK_TOPIC);
>
>     stockStream.groupByKey().reduce((value1, value2) -> value1 +
> value2, Materialized.as(STOCK_STORE));
>     KafkaStreams streams = new KafkaStreams(streamsBuilder.build(),
> config);
>
>     streams.start();
>
>     return streams;
> }
>
> The problem is when i try to acces the store:
>
> stockStreams.store(STOCK_STORE, QueryableStoreTypes.keyValueStore())
>
> it's not available anymore:
> org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> store, stockStore, may have migrated to another instance.
> at
>
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
> ~[kafka-streams-1.0.2.jar:na]
> at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
> ~[kafka-streams-1.0.2.jar:na]
>
> If I make a count instead of reduce I can access the materialized view (but
> of course with the wrong stock level)
>
> Has anyone an advice?
>
> best regards
> Toni
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message