kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: Kafka Streams Failed to rebalance error
Date Wed, 07 Jun 2017 21:17:15 GMT
Hi there,

This might be a bug, would you mind opening a JIRA (copy-pasting below is sufficient).

Thanks
Eno
> On 7 Jun 2017, at 21:38, João Peixoto <joao.hartimer@gmail.com> wrote:
> 
> I'm using Kafka Streams 0.10.2.1 and I still see this error
> 
> 2017-06-07 20:28:37.211  WARN 73 --- [ StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : Could not create task 0_31. Will
> retry.
> 
> org.apache.kafka.streams.errors.LockException: task [0_31] Failed to lock
> the state directory for task 0_31
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> ~[kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [kafka-clients-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> [kafka-streams-0.10.2.1.jar!/:na]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> [kafka-streams-0.10.2.1.jar!/:na]
> 
> 
> It has been printing it for hours now, so it does not recover at all.
> The most worrying thing is that this stream definition does not even use
> state stores, it literally looks like this:
> 
> KStreamBuilder builder = new KStreamBuilder();
>        KStream<byte[], Message> kStream =
> builder.stream(appOptions.getInput().getTopic());
>        kStream.process(() -> processor);
>        new KafkaStreams(builder, streamsConfiguration);
> 
> The "processor" does its thing and calls "context().commit()" when done.
> That's it. Looking at the actual machine running the instance, the folders
> under /tmp/kafka-streams/<stream name>/ only have a .lock file.
> 
> This seems to have been bootstrapped by the exception:
> 
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout
> or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> 
> We are addressing the latter by reducing "max.poll.records" and increasing "
> commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry about
> state dirs if there are no state stores? Since it doesn't seem to do so
> automatically, can I configured it somehow to achieve this end?
> 
> Additionally, what could lead to it not being able to recover?
> 
> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <matthias@confluent.io>
> wrote:
> 
>> Great! :)
>> 
>> On 5/16/17 2:31 AM, Sameer Kumar wrote:
>>> I see now that my Kafka cluster is very stable, and these errors dont
>> come
>>> now.
>>> 
>>> -Sameer.
>>> 
>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sam.kum.work@gmail.com>
>> wrote:
>>> 
>>>> Yes, I have upgraded my cluster and client both to version 10.2.1 and
>>>> currently monitoring the situation.
>>>> Will report back in case I find any errors. Thanks for the help though.
>>>> 
>>>> -Sameer.
>>>> 
>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax <matthias@confluent.io>
>>>> wrote:
>>>> 
>>>>> Did you see Eno's reply?
>>>>> 
>>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If not,
>>>>> please report back.
>>>>> 
>>>>> I would also recommend to subscribe to the list. It's self-service
>>>>> http://kafka.apache.org/contact
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote:
>>>>>> My brokers are on version 10.1.0 and my clients are on version 10.2.0.
>>>>>> Also, do a reply to all, I am currently not subscribed to the list.
>>>>>> 
>>>>>> -Sameer.
>>>>>> 
>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar <sam.kum.work@gmail.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I ran two nodes in my streams compute cluster, they were running
fine
>>>>> for
>>>>>>> few hours before outputting with failure to rebalance errors.
>>>>>>> 
>>>>>>> 
>>>>>>> I couldnt understand why this happened but I saw one strange
>>>>> behaviour...
>>>>>>> 
>>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory"
error,
>>>>> this
>>>>>>> might have caused the partitions to relocate and hence the error.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I am attaching detailed logs for both the nodes, please see if
you
>> can
>>>>>>> help.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Some of the logs for quick reference are these.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in
thread
>>>>>>> StreamThread-2
>>>>>>> 
>>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>>>>>>> [StreamThread-2] Failed to rebalance
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368)
>>>>>>> 
>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>> d.java:488)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre
>>>>> ad.java:259)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor
>>>>>>> dinator.java:396)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract
>>>>>>> Coordinator.java:329)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract
>>>>>>> Coordinator.java:303)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582)
>>>>>>> 
>>>>>>>                ... 1 more
>>>>>>> 
>>>>>>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException:
>>>>>>> Commit cannot be completed since the group has already rebalanced
and
>>>>>>> assigned the partitions to another member. This means that the
time
>>>>> between
>>>>>>> subsequent calls to poll() was longer than the configured
>>>>>>> max.poll.interval.ms, which typically implies that the poll loop
is
>>>>>>> spending too much time message processing. You can address this
>> either
>>>>> by
>>>>>>> increasing the session timeout or by reducing the maximum size
of
>>>>> batches
>>>>>>> returned in poll() with max.poll.records.
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co
>>>>>>> nsumerCoordinator.java:698)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer
>>>>>>> Coordinator.java:577)
>>>>>>> 
>>>>>>>                at org.apache.kafka.clients.consu
>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea
>>>>> d.java:480)
>>>>>>> 
>>>>>>>                ... 10 more
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2017-05-03 16:53:57 WARN  StreamThread:1184 - Could not create
task
>>>>> 1_38.
>>>>>>> Will retry.
>>>>>>> 
>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] Failed
to
>>>>> lock
>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa
>>>>>>> nager.java:102)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr
>>>>> ead.java:1207)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac
>>>>>>> koff(StreamThread.java:1180)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69)
>>>>>>> 
>>>>>>>                at org.apache.kafka.streams.proce
>>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr
>>>>> ead.java:236)
>>>>>>> 
>>>>>>> 
>>>>>>> Regards,
>>>>>>> 
>>>>>>> -Sameer.
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 


Mime
View raw message