kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ishita Rakshit <mail2i...@gmail.com>
Subject Re: InvalidStateStoreException on KStream join using GlobalKtables
Date Thu, 09 May 2019 13:33:35 GMT
Hi Guozhang,
I am using Kafka 2.2.0. The issue is resolved now. We had set
auto.register.schemas=false
as we wanted to manually register the schemas. It got fixed after setting
the flag to true as it needs to register schemas for its internal topics.

Thanks,
Ishita Rakshit

On Tue, May 7, 2019 at 8:32 PM Guozhang Wang <wangguoz@gmail.com> wrote:

> Hello Ishita,
>
> Is it consistently reproducing? And which Kafka version are you using?
>
>
> Guozhang
>
>
> On Thu, May 2, 2019 at 5:24 PM Ishita Rakshit <mail2ishi@gmail.com> wrote:
>
> > Hi,
> > I have a Kafka Streams application where I am joining a KStream that
> reads
> > from "topic1" with a GlobalKTable that reads from "topic2" and then with
> > another GlobalKTable that reads from "topic3". Here is the pseudo code -
> >
> >
> > > KStream<String, GenericRecord> topic1KStream =
> > >             builder.stream(
> > >                 "topic1",
> > >                 Consumed.with(Serdes.String(), genericRecordSerde)
> > >             );
> > >  GlobalKTable<String, GenericRecord> topic2KTable =
> > >             builder.globalTable(
> > >                 "topic2",
> > >                 Consumed.with(Serdes.String(), genericRecordSerde),
> > >                 Materialized.<String, GenericRecord,
> KeyValueStore<Bytes,
> > > byte[]>>as("topic2-global-store")
> > >                     .withKeySerde(Serdes.String())
> > >                     .withValueSerde(genericRecordSerde)
> > >             );
> > >    GlobalKTable<String, GenericRecord> topic3KTable =
> > >             builder.globalTable(
> > >                 "topic3",
> > >                 Consumed.with(Serdes.String(), genericRecordSerde),
> > >                 Materialized.<String, GenericRecord,
> KeyValueStore<Bytes,
> > > byte[]>>as("topic3-global-store")
> > >                     .withKeySerde(Serdes.String())
> > >                     .withValueSerde(genericRecordSerde)
> > >             );
> > >
> > > KStream<String, MergedObj> stream_topic1_topic2 = topic1KStream.join(
> > >             topic2KTable,
> > >             (topic2Id, topic1Obj) -> topic1.get("id").toString(),
> > >             (topic1Obj, topic2Obj) -> new MergedObj(topic1Obj,
> topic2Obj)
> > >         );
> > >         final KStream<String, GenericRecord> enrichedStream =
> > > stream_topic1_topic2.join(
> > >             topic3KTable,
> > >             (topic2Id, mergedObj) -> mergedObj.topic3Id(),
> > >             (mergedObj, topic3Obj) -> new Enriched(
> > >                 mergedObj.topic1Obj,
> > >                 mergedObj.topic2Obj,
> > >                 topic3Obj
> > >             ).enrich()
> > >         );
> > >  enrichedStream.to("enrichedStreamTopic",
> Produced.with(Serdes.String(),
> > > getGenericRecordSerde()));
> >
> >
> > The above code is very similar to this -
> >
> >
> https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/GlobalKTablesExample.java
> >
> > When I try to push messages to all 3 topics at the same time then I get
> > following exception -
> >
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=topic1,
> > > partition=1, offset=61465,
> > > stacktrace=org.apache.kafka.streams.errors.InvalidStateStoreException:
> > > Store topic2-global-store is currently closed.
> > >     at
> > >
> >
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:66)
> > >     at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:150)
> > >     at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:37)
> > >     at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:135)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadOnlyDecorator.get(ProcessorContextImpl.java:245)
> > >     at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
> > >     at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:71)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:890)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> > >     at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> >
> >
> > If I push message one by one in these topics, then I do not get this
> > exception.
> >
> > Kindly advice how to fix this exception.
> >
> > Thanks for your help in advance.
> >
> > Regards,
> > Ishita
> >
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message