kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Henri Pihkala <henri.pihk...@streamr.com>
Subject Topic in bad state after controller to broker messaging fails
Date Tue, 06 Jan 2015 22:17:02 GMT
Hi,

I’m hitting a strange problem using 0.8.2-beta and just a single kafka broker on CentOS
6.5. 

A percentage of my topic create attempts are randomly failing and leaving the new topic in
a state in which it can not be used due to “partition doesn’t exist” errors as seen
in server.log below.

In controller.log, it looks like the controller fails to send either the "become-leader LeaderAndIsr
request” or the "UpdateMetadata request” to the broker (which in fact is the same Kafka
instance), due to a socket read failing (for unknown reason).

My questions:

(1) Is the bad topic state a result of the message not making it from the controller to the
broker?

(2) Any idea why the socket read might randomly fail? It can’t be a network issue since
we’re running a single instance.

(3) Shouldn’t the controller try to resend the message?



controller.log

[2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
(kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
(kafka.controller.KafkaController)

[2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]: Invoking state change
to NewPartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]: Invoking state change
to NewReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]: Invoking state change
to OnlinePartition for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]: Live assigned replicas
for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)] (kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]: Initializing leader
and isr for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] to (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
(kafka.controller.PartitionStateMachine)

[2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]: Invoking state change
to OnlineReplica for replicas [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
(kafka.controller.ReplicaStateMachine)

[2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread], Controller 0 fails
to send a request to broker id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread)
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
        at kafka.utils.Utils$.read(Utils.scala:381)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
        at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
        at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

[2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread], Controller 0 epoch
2 failed to send request Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
-> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
        at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
        at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)




state-change.log

[2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0]
state from NonExistentPartition to NewPartition with assigned replicas 0 (state.change.logger)

[2015-01-06 21:31:10,308] TRACE Controller 0 epoch 2 changed state of replica 0 for partition
[09b1ebac-7036-49fc-aa61-7852808ca241,0] from NonExistentReplica to NewReplica (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed partition [09b1ebac-7036-49fc-aa61-7852808ca241,0]
from NewPartition to OnlinePartition with leader 0 (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending become-leader LeaderAndIsr request
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2) with correlationId 16 to broker 0 for partition
[09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
with correlationId 16 to broker 0 for partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] (state.change.logger)

[2015-01-06 21:31:10,501] TRACE Controller 0 epoch 2 changed state of replica 0 for partition
[09b1ebac-7036-49fc-aa61-7852808ca241,0] from NewReplica to OnlineReplica (state.change.logger)




server.log

[2015-01-06 22:01:48,137] WARN [KafkaApi-0] Offset request with correlation id 0 from client
console-consumer-34042-ConsumerFetcherThread-console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-0-0
on partition [09b1ebac-7036-49fc-aa61-7852808ca241,0] failed due to Partition [09b1ebac-7036-49fc-aa61-7852808ca241,0]
doesn't exist on 0 (kafka.server.KafkaApis)

[2015-01-06 22:01:48,140] INFO Closing socket connection to /192.168.10.21. (kafka.network.Processor)

... etc etc


describe topic:

$ bin/kafka-topics.sh --zookeeper dev.unifina:2181 --describe --topic 09b1ebac-7036-49fc-aa61-7852808ca241
Topic:09b1ebac-7036-49fc-aa61-7852808ca241      PartitionCount:1        ReplicationFactor:1
    Configs:
        Topic: 09b1ebac-7036-49fc-aa61-7852808ca241     Partition: 0    Leader: 0       Replicas:
0     Isr: 0


attempt to consume from the topic using the console consumer:

[2015-01-06 22:01:47,928] WARN [console-consumer-34042_dev.unifina-1420581705389-b3ba0eb6-leader-finder-thread],
Failed to add leader for partitions [09b1ebac-7036-49fc-aa61-7852808ca241,0]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.UnknownTopicOrPartitionException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at java.lang.Class.newInstance0(Class.java:372)
        at java.lang.Class.newInstance(Class.java:325)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:168)
        at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
        at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:180)
        at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:175)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:175)
        at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
        at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Thanks for your help!

Best regards
Henri


Mime
View raw message