kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From József Molnár <molnarjozsef.1...@gmail.com>
Subject Process messages from StateStore
Date Mon, 18 Jun 2018 09:18:56 GMT
Hi!

I have an application which uses an input and output topic, and every
message from the input topic should have a corresponding message (with the
same key) in the output topic.

To detect lost messages (=no output after a certain amount of time,
~10days) I tried to use a KTable - KTable left join and check where the
output values are null in the result KTable's state store.

Sample code:
// Stream setup
StreamsBuilder builder = new StreamsBuilder();
KTable<String, InboundMsg> inputTable = builder.table("inputTopic",
Consumed.with(...).filter(...));
KTable<String, OutboundMsg> outputTable = builder.table("outputTopic",
Consumed.with(...));

Materialized<String, InboundMsg, KeyValueStore<Bytes, byte[]>> store =
        Materialized.<String, InboundMsg, KeyValueStore<Bytes,
byte[]>>as("Store")..;
KTable<String, InboundMsg> joinedTable = inputTable.leftJoin(outputTable,
ValueMapper, store);

// Read from store
ReadOnlyKeyValueStore<String, InboundMsg> keyValueStore =
streams.store("Store", QueryableStoreTypes.keyValueStore());
KeyValueIterator<String, InboundMsg> allMsg = keyValueStore.all();

Is there any other way to read from the state store and possibly stream it
to a topic? As there can be a couple of million messages int he topics,
reading all of them with an iterator will be not performant enough.

Thanks,
Jozsef

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message