kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Håkon Åmdal <hakon.amd...@gmail.com>
Subject Re: Adding or removing input topics to a Kafka Consumer without downtime
Date Tue, 05 Sep 2017 21:24:52 GMT
Could this be something that only is relevant to the Streams app, then?

I've also tried to add a join, but I'm getting the same issues with
repartitioning the state store. I've pasted some stack traces below.

19:14:54 ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided
listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group Jord failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Store
KSTREAM-JOINTHIS-0000000017-store's change log
(Jord-KSTREAM-JOINTHIS-0000000017-store-changelog) does not contain
partition 16
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
	at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:110)
	at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:72)
	at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:65)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:95)
	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)


19:14:54 ERROR c.s.s.data.yggdrasil.Yggdrasil - Uncaught exception:
Thread Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5
stopped unexpectedly after stream-thread
[Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5] Failed to
rebalance.
org.apache.kafka.streams.errors.StreamsException: stream-thread
[Jord-f5f1364e-feb5-445e-a61c-c36b15b76605-StreamThread-5] Failed to
rebalance.
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: Store
KSTREAM-JOINTHIS-0000000017-store's change log
(Jord-KSTREAM-JOINTHIS-0000000017-store-changelog) does not contain
partition 17
	at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
	at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:110)
	at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:72)
	at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:65)
	at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:95)
	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
	... 2 common frames omitted


tir. 5. sep. 2017 kl. 23:02 skrev Jeff Widman <jeff@jeffwidman.com>:

> I haven't played with the Streams app, but I know at the consumer group
> level that consumers who are part of the same group can have different
> subscriptions. So at the consumer group level at least this is
> theoretically possible to have a rolling upgrade (and in fact was
> purposefully designed to support this use case)
>
> On Tue, Sep 5, 2017 at 7:00 AM, Håkon Åmdal <hakon.amdal2@gmail.com>
> wrote:
>
> > Hi there,
> >
> > In our company, we are running multiple Kafka Streams app that are
> deployed
> > in a red/black fashion. A deployment means starting a new set of hosts,
> run
> > them in parallel with the old hosts until they pass the health check, and
> > then scale down the old hosts. This approach works very well as we can
> > deploy application changes without any downtime.
> >
> > However, as far as I understand, there is no way we can run a new and old
> > version in parallel if they don’t consume from the same set of topics.
> > Consumers will try to rebalance tasks between the two different
> application
> > version, causing errors like the one below:
> >
> > Uncaught exception: Thread xyz-StreamThread-1 stopped unexpectedly after
> > Assigned partition foo-1 for non-subscribed topic regex pattern;
> > subscription pattern is bar
> >
> > To mitigate the problem, we have so far scaled down the old cluster to 0
> > instances before deploying the new application if we’re adding or
> removing
> > input topics. However, this causes service downtime which we no longer
> can
> > accept.
> >
> > I’m curious to hear if anyone are experiencing the same issues, or if
> > anyone have any thoughts or opinions? Are we doing something wrong, or is
> > this something that can be solved by the Kafka Consumer client?
> >
> > Thanks,
> >
> > Håkon
> >
>
>
>
> --
>
> *Jeff Widman*
> jeffwidman.com <http://www.jeffwidman.com/> | 740-WIDMAN-J (943-6265)
> <><
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message