kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: InvalidStateStoreException on KStream join using GlobalKtables
Date Wed, 08 May 2019 00:32:28 GMT
Hello Ishita,

Is it consistently reproducing? And which Kafka version are you using?


Guozhang


On Thu, May 2, 2019 at 5:24 PM Ishita Rakshit <mail2ishi@gmail.com> wrote:

> Hi,
> I have a Kafka Streams application where I am joining a KStream that reads
> from "topic1" with a GlobalKTable that reads from "topic2" and then with
> another GlobalKTable that reads from "topic3". Here is the pseudo code -
>
>
> > KStream<String, GenericRecord> topic1KStream =
> >             builder.stream(
> >                 "topic1",
> >                 Consumed.with(Serdes.String(), genericRecordSerde)
> >             );
> >  GlobalKTable<String, GenericRecord> topic2KTable =
> >             builder.globalTable(
> >                 "topic2",
> >                 Consumed.with(Serdes.String(), genericRecordSerde),
> >                 Materialized.<String, GenericRecord, KeyValueStore<Bytes,
> > byte[]>>as("topic2-global-store")
> >                     .withKeySerde(Serdes.String())
> >                     .withValueSerde(genericRecordSerde)
> >             );
> >    GlobalKTable<String, GenericRecord> topic3KTable =
> >             builder.globalTable(
> >                 "topic3",
> >                 Consumed.with(Serdes.String(), genericRecordSerde),
> >                 Materialized.<String, GenericRecord, KeyValueStore<Bytes,
> > byte[]>>as("topic3-global-store")
> >                     .withKeySerde(Serdes.String())
> >                     .withValueSerde(genericRecordSerde)
> >             );
> >
> > KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join(
> >             topic2KTable,
> >             (topic2Id, topic1Obj) -> topic1.get("id").toString(),
> >             (topic1Obj, topic2Obj) -> new MergedObj(topic1Obj, topic2Obj)
> >         );
> >         final KStream<String, GenericRecord> enrichedStream =
> > stream_topic1_topic2.join(
> >             topic3KTable,
> >             (topic2Id, mergedObj) -> mergedObj.topic3Id(),
> >             (mergedObj, topic3Obj) -> new Enriched(
> >                 mergedObj.topic1Obj,
> >                 mergedObj.topic2Obj,
> >                 topic3Obj
> >             ).enrich()
> >         );
> >  enrichedStream.to("enrichedStreamTopic", Produced.with(Serdes.String(),
> > getGenericRecordSerde()));
>
>
> The above code is very similar to this -
>
> https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java
>
> When I try to push messages to all 3 topics at the same time then I get
> following exception -
>
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1,
> > partition=1, offset=61465,
> > stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException:
> > Store topic2-global-store is currently closed.
> >     at
> >
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66)
> >     at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
> >     at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37)
> >     at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
> >     at
> >
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
> >     at
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
> >     at
> >
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
> >     at
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> >     at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
>
>
> If I push message one by one in these topics, then I do not get this
> exception.
>
> Kindly advice how to fix this exception.
>
> Thanks for your help in advance.
>
> Regards,
> Ishita
>


-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message