kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Toni Zehnder <toni.zehn...@namics.com.INVALID>
Subject KStream and reduce
Date Tue, 11 Sep 2018 11:49:14 GMT
Hi guys,
I try to get more familar with Kafka. So I made a small spring boot
application.
I have a simple Kafka with topic "stock" where I put a productId as key and
an Integer as stock change into it. e.g. {10001, -5};{10001, 2}.
Now I want to create a materialized view to get the current stock level of
a specific product:

@Bean
public KafkaStreams stockStreams() {
    StreamsConfig config = stockStreamsConfig();
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    KStream<Long, Integer> stockStream = streamsBuilder.stream(STOCK_TOPIC);

    stockStream.groupByKey().reduce((value1, value2) -> value1 +
value2, Materialized.as(STOCK_STORE));
    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), config);

    streams.start();

    return streams;
}

The problem is when i try to acces the store:

stockStreams.store(STOCK_STORE, QueryableStoreTypes.keyValueStore())

it's not available anymore:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state
store, stockStore, may have migrated to another instance.
at
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
~[kafka-streams-1.0.2.jar:na]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
~[kafka-streams-1.0.2.jar:na]

If I make a count instead of reduce I can access the materialized view (but
of course with the wrong stock level)

Has anyone an advice?

best regards
Toni

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message