kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Toni Zehnder <toni.zehn...@namics.com.INVALID>
Subject Re: KStream and reduce
Date Tue, 11 Sep 2018 13:30:34 GMT
Hi Francesco,
I tried that already. But the store will never get ready. I restartet my
application and and then the store will be ready instantly. I could
reproduce this.
Can it be that if I try to create a store without a message in my stream
the store will never become ready?

The configuration looks like this:

StreamsConfig config = stockStreamsConfig();
StreamsBuilder streamsBuilder = new StreamsBuilder();

KStream<Long, Long> stockStream = streamsBuilder.stream(STOCK_TOPIC);

stockStream.groupByKey()
           .reduce((value1, value2) -> {
               LOG.info("Reduce {} and {}", value1, value2);
               return value1 + value2;
           }, Materialized.as(STOCK_STORE));

KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), config);

streams.start();

return streams;

So if I use reduce there must be some entries before the store is
created/will become ready?

Best regards
Toni


Francesco Frontera <francesco.frontera@radicalbit.io> schrieb am Di., 11.
Sep. 2018 um 14:10 Uhr:

> Hi Toni,
>
> it looks like that the local Kafka Streams instance is not yet read, for
> this reason, local state store can't be queried yet.
> you could try to use the methods explained in Kafka Streams documentation
> <https://docs.confluent.io/current/streams/faq.html#id28> for avoiding
> this
> problem:
>
> // Example: Wait until the store of type T is queryable.  When it is,
> return a reference to the store.public static <T> T
> waitUntilStoreIsQueryable(final String storeName,
>                                               final
> QueryableStoreType<T> queryableStoreType,
>                                               final KafkaStreams
> streams) throws InterruptedException {
>   while (true) {
>     try {
>       return streams.store(storeName, queryableStoreType);
>     } catch (InvalidStateStoreException ignored) {
>       // store not yet ready for querying
>       Thread.sleep(100);
>     }
>   }}
>
> I hope I've been of some help.
>
> Best regards,
> *Francesco Frontera*
>
> *Software Engineer @ Radicalbit*
> *Via Borsieri 41, 20159, Milano
> <https://maps.google.com/?q=Via+Borsieri+41,+20159,+Milano&entry=gmail&source=g>
> - IT*
>
>
> Il giorno mar 11 set 2018 alle ore 13:49 Toni Zehnder
> <toni.zehnder@namics.com.invalid> ha scritto:
>
> > Hi guys,
> > I try to get more familar with Kafka. So I made a small spring boot
> > application.
> > I have a simple Kafka with topic "stock" where I put a productId as key
> and
> > an Integer as stock change into it. e.g. {10001, -5};{10001, 2}.
> > Now I want to create a materialized view to get the current stock level
> of
> > a specific product:
> >
> > @Bean
> > public KafkaStreams stockStreams() {
> >     StreamsConfig config = stockStreamsConfig();
> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
> >
> >     KStream<Long, Integer> stockStream =
> > streamsBuilder.stream(STOCK_TOPIC);
> >
> >     stockStream.groupByKey().reduce((value1, value2) -> value1 +
> > value2, Materialized.as(STOCK_STORE));
> >     KafkaStreams streams = new KafkaStreams(streamsBuilder.build(),
> > config);
> >
> >     streams.start();
> >
> >     return streams;
> > }
> >
> > The problem is when i try to acces the store:
> >
> > stockStreams.store(STOCK_STORE, QueryableStoreTypes.keyValueStore())
> >
> > it's not available anymore:
> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > store, stockStore, may have migrated to another instance.
> > at
> >
> >
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
> > ~[kafka-streams-1.0.2.jar:na]
> > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
> > ~[kafka-streams-1.0.2.jar:na]
> >
> > If I make a count instead of reduce I can access the materialized view
> (but
> > of course with the wrong stock level)
> >
> > Has anyone an advice?
> >
> > best regards
> > Toni
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message