kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From D Stephan <kafkastre...@gmail.com>
Subject Re: Kafka Stream Exception: partition issue
Date Tue, 21 Nov 2017 07:36:52 GMT
Thanks Matthias for the information!

2017-11-20 19:23 GMT+01:00 Matthias J. Sax <matthias@confluent.io>:

> Sound like Streams can't fetch the metadata completely.
>
> You can increase Consumer config `REQUEST_TIMEOUT_MS_CONFIG` to give
> more time to the cluster to broadcast the information to all brokers.
>
> https://docs.confluent.io/current/streams/developer-
> guide/config-streams.html#kafka-consumers-and-producer-
> configuration-parameters
>
> Please let us know if this resolved the issue.
>
>
> -Matthias
>
> On 11/20/17 7:22 AM, D Stephan wrote:
> > Hello,
> >
> > We are using Kafka version 0.11.0.1. with KafkaStream.
> > Using the leftJoin API in the Kafka Stream:
> >
> > <VO,VR> KStream<K,VR> leftJoin(KStream<K,VO> otherStream,
> >                              ValueJoiner<? super V,? super VO,? extends
> VR>
> > joiner,
> >                              JoinWindows windows)
> >
> > https://kafka.apache.org/0110/javadoc/org/apache/kafka/
> streams/kstream/KStream.html#leftJoin(org.apache.kafka.
> streams.kstream.KStream,%20org.apache.kafka.streams.
> kstream.ValueJoiner,%20org.apache.kafka.streams.kstream.JoinWindows)
> >
> > with partitions = 30
> > brokers = 3 and replicas =1.
> >
> > We run the stream application on 3 instances machines.  When one of the
> > machine the machine automatically re-started, we got exception because
> > autogenerated stores name is not available anymore.
> > and the state of the instance is changing from rebalancing to error.
> >
> > Hereby is the exception:
> >
> > org.apache.kafka.streams.errors.StreamsException: Store
> > KSTREAM-JOINTHIS-0000000056-store's change log
> > (joinService1-KSTREAM-JOINTHIS-0000000056-store-changelog) does not
> contain
> > partition 21
> > at
> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.
> validatePartitionExists(StoreChangelogReader.java:91)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:158)
> > at
> > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> register(AbstractProcessorContext.java:100)
> > at
> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> init(RocksDBSegmentedBytesStore.java:110)
> > at
> > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesSto
> re.init(ChangeLoggingSegmentedBytesStore.java:72)
> > at
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.
> init(MeteredSegmentedBytesStore.java:72)
> > at
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:95)
> > at
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:220)
> > at
> > org.apache.kafka.streams.processor.internals.StreamTask.initialize(
> StreamTask.java:566)
> > at
> > org.apache.kafka.streams.processor.internals.AssignedTasks.
> initializeNewTasks(AssignedTasks.java:89)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:493)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:480)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:457)
> >
> >
> > Do you have any ideas to mitigate this issue?
> >
> > Thanks,
> > Danny
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message