kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Demidko <alexan...@metamarkets.com>
Subject Re: KafkaException: This operation cannot be completed on a complete request
Date Sat, 19 Apr 2014 00:02:23 GMT
Tried to reproduce this one more time. I was using kill -9 shutdown to test
reiliability, with graceful termination I haven't seen this problem to
arise. Leader node started complaining that ReplicaFetcherThread can't
connect to other node and that Producer can't send request to terminated
node, but I guess this is pretty much expectable.

In overall, the pattern to more or less stable reproduce is the following:

Node 1(RestartedNode) and 2(LeaderNode) both in ISR and with balanced
leadership
Kill -9 node 1
Start node 1
Node 1 recovers segments and starts broker with a little suspicious
message: kafka.utils.ZkUtils$ - conflict in /controller data:
{"version":1,"brokerid":1,"timestamp":"1397864759089"} stored data:
{"version":1,"brokerid":2,"timestamp":"1397863473709"}
Node 2 expands ISR for all partitions from 2 to 2,1
Node 2 starts emit "Closing socket for /Node1 because of error
KafkaException: This operation cannot be completed on a complete request",
and Node 1 - "EOFException: Received -1 when reading from channel, socket
has likely been closed"

There is no other exceptions in the log.



On Fri, Apr 18, 2014 at 3:21 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> When you are shutting down the restart node, did you see any warn/error on
> the leader logs?
>
> Guozhang
>
>
> On Fri, Apr 18, 2014 at 1:58 PM, Alex Demidko <alexander@metamarkets.com
> >wrote:
>
> > Last time saw this exception when tried to use rebalance leadership with
> > kafka-preferred-replica-election.sh. That's what got in logs:
> >
> > LeaderNode: just kafka.common.KafkaException: This operation cannot be
> > completed on a complete request without any other exceptions.
> >
> >
> > RestartedNode:
> >
> > 2014-04-18 20:43:39,281 INFO  [ReplicaFetcherThread-2-1] kafka.log.Log -
> > Rolled new log segment for 'loadtest-170' in 1 ms.
> > java.lang.OutOfMemoryError: Java heap space
> > Dumping heap to java_pid28305.hprof ...
> > Heap dump file created [13126900997 bytes in 11.728 secs]
> > 2014-04-18 20:44:02,299 INFO  [main-SendThread]
> > org.apache.zookeeper.ClientCnxn - Client session timed out, have not
> heard
> > from server in 28101ms for sessionid 0x7455b
> > 0b68302016, closing socket connection and attempting reconnect
> > 2014-04-18 20:44:02,328 ERROR [ReplicaFetcherThread-2-1]
> > kafka.network.BoundedByteBufferReceive - OOME with size 2022780218
> > java.lang.OutOfMemoryError: Java heap space
> >         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> >         at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > (...)
> > 2014-04-18 20:44:41,212 ERROR [ZkClient-EventThread]
> > org.I0Itec.zkclient.ZkEventThread - Error handling event ZkEvent[New
> > session event sent to
> > kafka.controller.KafkaController$SessionExpirationListener@7ea9f7b8]
> > java.lang.IllegalStateException: Kafka scheduler has not been started
> >         at
> > kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:116)
> >         at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:349)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339)
> >         at
> >
> >
> kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339)
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >         at
> >
> >
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:339)
> >         at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
> >         at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> >         at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >         at
> >
> >
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
> >         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
> >         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> > 2014-04-18 20:44:44,371 INFO  [Thread-1-EventThread]
> > org.apache.zookeeper.ClientCnxn - EventThread shut down
> > 2014-04-18 20:44:55,526 INFO  [ZkClient-EventThread]
> > kafka.server.KafkaHealthcheck - re-registering broker info in ZK for
> broker
> > 2
> > 2014-04-18 20:44:58,692 INFO  [ZkClient-EventThread]
> kafka.utils.ZkUtils$ -
> > Registered broker 2 at path /brokers/ids/2 with address
> > 2014-04-18 20:45:01,020 INFO  [ZkClient-EventThread-15]
> > kafka.server.KafkaHealthcheck - done re-registering broker
> > 2014-04-18 20:45:07,996 INFO  [ZkClient-EventThread-15]
> > kafka.server.KafkaHealthcheck - Subscribing to /brokers/topics path to
> > watch for new topics
> > 2014-04-18 20:45:07,998 WARN  [kafka-request-handler-20]
> > state.change.logger - Broker 2 received invalid LeaderAndIsr request with
> > correlation id 11 from controller 1 epoch 13 with an older leader epoch
> 22
> > for partition [loadtest,66], current leader epoch is 22
> > (...)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> >         at kafka.utils.Utils$.read(Utils.scala:376)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at
> >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > Even if I restart RestartedNode it doesn't help, only the LeaderNode
> > restart helps which I'd really like to avoid because leader is the only
> > in-sync replica (and also there might be multiple leaders for different
> > partitions).
> >
> > Another question why there is an OOME on a RestartedNode.
> >
> >
> >
> >
> >
> > On Fri, Apr 18, 2014 at 1:30 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > Hello Alex,
> > >
> > > I think this is a bug on the FetchResponseSend class. Just to confirm,
> > > before the
> > >
> > > kafka.common.KafkaException: This operation cannot be completed on a
> > > complete request.
> > >
> > > do you see other warn/error logs on the current leader?
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Apr 18, 2014 at 11:57 AM, Alexander Demidko <
> > > alexander@metamarkets.com> wrote:
> > >
> > > > Have tried to reproduce this error, and it occurs pretty consistently
> > > when
> > > > node being forcefully shutdown w/o graceful termination. When
> graceful
> > > > shutdown was successful no errors occur in a log when the instance
> was
> > > > rebooted starts.
> > > >
> > > >
> > > > On Fri, Apr 18, 2014 at 11:17 AM, Alex Demidko <
> > > alexander@metamarkets.com
> > > > >wrote:
> > > >
> > > > > These on alive node:
> > > > >
> > > > > 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15]
> > > > > state.change.logger - Controller 2 epoch 8 encountered error while
> > > > electing
> > > > > leader for partition [loadtest,143] due to: Preferred replica 1 for
> > > > > partition [loadtest,143] is either not alive or not in the isr.
> > Current
> > > > > leader and ISR: [{"leader":2,"leader_epoch":11,"isr":[2]}].
> > > > > 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15]
> > > > > state.change.logger - Controller 2 epoch 8 initiated state change
> for
> > > > > partition [loadtest,143] from OnlinePartition to OnlinePartition
> > failed
> > > > >
> > > > > 2014-04-18 00:38:50,014 ERROR
> [Controller-2-to-broker-1-send-thread]
> > > > > kafka.controller.RequestSendThread -
> > > > > [Controller-2-to-broker-1-send-thread], Controller 2's connection
> to
> > > > broker
> > > > > id:1,host:<RESTARTED_BROKER_IP>,port:9092 was unsuccessful
> > > > > 2014-04-18 00:38:50,314 ERROR
> [Controller-2-to-broker-1-send-thread]
> > > > > kafka.controller.RequestSendThread -
> > > > > [Controller-2-to-broker-1-send-thread], Controller 2 epoch 8 failed
> > to
> > > > send
> > > > > UpdateMetadata request with correlation id 3854 to broker
> > > > > id:1,host:<RESTARTED_BROKER_IP>,port:9092. Reconnecting to
broker.
> > > > >
> > > > >
> > > > > On Apr 18, 2014, at 10:41 AM, Jun Rao <junrao@gmail.com> wrote:
> > > > >
> > > > > > Any errors from the controller/state-change log?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko <
> > > > alexander@metamarkets.com
> > > > > >wrote:
> > > > > >
> > > > > >> Hi,
> > > > > >>
> > > > > >> I'm performing a producing load test on two node kafka cluster
> > built
> > > > > from
> > > > > >> the last 0.8.1 branch sources. I have topic loadtest with
> > > replication
> > > > > >> factor 2 and 256 partitions. Initially both brokers are
in ISR
> and
> > > > > >> leadership is balanced. When in the middle of the load test
one
> > > broker
> > > > > was
> > > > > >> restarted (wasn't able to go with controlled shutdown in
> specified
> > > > time
> > > > > and
> > > > > >> was killed), I started receiving following errors which
as far
> as
> > I
> > > > > >> understand coming from replication:
> > > > > >>
> > > > > >>
> > > > > >> On restarted broker
> > > > > >>
> > > > > >> 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2]
> > > > > >> kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2],
> > > Error
> > > > in
> > > > > >> fetch Name: FetchRequest; Version: 0; CorrelationId: 52890;
> > > ClientId:
> > > > > >> ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms;
> > MinBytes:
> > > 1
> > > > > >> bytes; RequestInfo: [loadtest2,71] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,85] ->
> > > > > >> PartitionFetchInfo(113676000,104857600),[loadtest,189] ->
> > > > > >> PartitionFetchInfo(112277000,104857600),[loadtest,21] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,205] ->
> > > > > >> PartitionFetchInfo(112986000,104857600),[loadtest,141] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,253] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,77] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,61] ->
> > > > > >> PartitionFetchInfo(112490000,104857600),[loadtest,229] ->
> > > > > >> PartitionFetchInfo(112805000,104857600),[loadtest,133] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest2,15] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest2,63] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,181] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,5] ->
> > > > > >> PartitionFetchInfo(112530000,104857600),[loadtest,29] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,45] ->
> > > > > >> PartitionFetchInfo(113113000,104857600),[loadtest2,39] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,37] ->
> > > > > >> PartitionFetchInfo(112145000,104857600),[loadtest,13] ->
> > > > > >> PartitionFetchInfo(112915000,104857600),[loadtest,237] ->
> > > > > >> PartitionFetchInfo(112896000,104857600),[loadtest,149] ->
> > > > > >> PartitionFetchInfo(113232000,104857600),[loadtest,117] ->
> > > > > >> PartitionFetchInfo(113100000,104857600),[loadtest,157] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,165] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,101] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,93] ->
> > > > > >> PartitionFetchInfo(113025000,104857600),[loadtest,125] ->
> > > > > >> PartitionFetchInfo(112896000,104857600),[loadtest,197] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,109] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,245] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,213] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,53] ->
> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,173] ->
> > > > > >> PartitionFetchInfo(112757000,104857600),[loadtest,69] ->
> > > > > >> PartitionFetchInfo(112378000,104857600),[loadtest,221] ->
> > > > > >> PartitionFetchInfo(0,104857600)
> > > > > >> java.io.EOFException: Received -1 when reading from channel,
> > socket
> > > > has
> > > > > >> likely been closed.
> > > > > >>        at kafka.utils.Utils$.read(Utils.scala:376)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > > >>        at
> > > > > >>
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > > >>        at
> > > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > > >>        at
> > > > > >>
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > > > >>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > > > >>        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > > > >>        at
> > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > > > >>        at
> > > > > >>
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > > > >>        at
> > > > > >>
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > > > >>        at
> > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > > > >> 2014-04-18 16:15:02,215 WARN  [ReplicaFetcherThread-1-2]
> > > > > >> kafka.consumer.SimpleConsumer - Reconnect due to socket
error:
> > null
> > > > > >>
> > > > > >>
> > > > > >> On current leader
> > > > > >>
> > > > > >> 2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1]
> > > > > >> kafka.network.Processor - Closing socket for
> /10.41.133.59because
> > > of
> > > > > >> error
> > > > > >> kafka.common.KafkaException: This operation cannot be completed
> > on a
> > > > > >> complete request.
> > > > > >>        at
> > > > > >>
> > > >
> > kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
> > > > > >>        at
> > > > > >>
> > > kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
> > > > > >>        at
> > > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
> > > > > >>        at kafka.network.Processor.write(SocketServer.scala:375)
> > > > > >>        at kafka.network.Processor.run(SocketServer.scala:247)
> > > > > >>        at java.lang.Thread.run(Thread.java:744)
> > > > > >>
> > > > > >>
> > > > > >> These errors are constantly bubbling up in logs and restarted
> > broker
> > > > > never
> > > > > >> made it back to ISR even when the load test was stopped.
> > > > > >>
> > > > > >>
> > > > > >> Kafka configuration:
> > > > > >>
> > > > > >> host.name                                   = #{IP_ADDR}
> > > > > >> port                                        = 9092
> > > > > >> socket.send.buffer.bytes                    = 1048576
> > > > > >> socket.receive.buffer.bytes                 = 1048576
> > > > > >> socket.request.max.bytes                    = 104857600
> > > > > >> num.io.threads                              = 24
> > > > > >> queued.max.requests                         = 1024
> > > > > >> fetch.purgatory.purge.interval.requests     = 1024
> > > > > >> producer.purgatory.purge.interval.requests  = 1024
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> broker.id                                   = #{BROKER_ID}
> > > > > >>
> > > > > >> log.flush.interval.messages                 = 100000
> > > > > >> log.flush.scheduler.interval.ms             = 1000
> > > > > >> log.flush.interval.ms                       = 2000
> > > > > >>
> > > > > >> log.dirs                                    = \
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt1/tmp/kafka-logs,/mnt2/tmp/kafka-logs,/mnt3/tmp/kafka-logs,/mnt4/tmp/kafka-logs,\
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt5/tmp/kafka-logs,/mnt6/tmp/kafka-logs,/mnt7/tmp/kafka-logs,/mnt8/tmp/kafka-logs,\
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt9/tmp/kafka-logs,/mnt10/tmp/kafka-logs,/mnt11/tmp/kafka-logs,/mnt12/tmp/kafka-logs,\
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt13/tmp/kafka-logs,/mnt14/tmp/kafka-logs,/mnt15/tmp/kafka-logs,/mnt16/tmp/kafka-logs,\
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt17/tmp/kafka-logs,/mnt18/tmp/kafka-logs,/mnt19/tmp/kafka-logs,/mnt20/tmp/kafka-logs,\
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> /mnt21/tmp/kafka-logs,/mnt22/tmp/kafka-logs,/mnt23/tmp/kafka-logs,/mnt24/tmp/kafka-logs
> > > > > >>
> > > > > >> log.segment.bytes                           = 1000000000
> > > > > >> log.roll.hours                              = 1
> > > > > >>
> > > > > >> log.retention.minutes                       = 10080
> > > > > >> log.retention.check.interval.ms             = 300000
> > > > > >> log.cleanup.policy                          = delete
> > > > > >>
> > > > > >> log.index.size.max.bytes                    = 10485760
> > > > > >>
> > > > > >> num.partitions                              = 256
> > > > > >> auto.create.topics.enable                   = false
> > > > > >>
> > > > > >>
> > > > > >> default.replication.factor                      = 2
> > > > > >> replica.lag.time.max.ms                         = 15000
> > > > > >> replica.lag.max.messages                        = 750000
> > > > > >> num.replica.fetchers                            = 8
> > > > > >> replica.socket.timeout.ms                       = 30000
> > > > > >> replica.socket.receive.buffer.bytes             = 1048576
> > > > > >> replica.fetch.max.bytes                         = 104857600
> > > > > >> replica.fetch.wait.max.ms                       = 1000
> > > > > >> replica.fetch.min.bytes                         = 1
> > > > > >> replica.high.watermark.checkpoint.interval.ms   = 5000
> > > > > >>
> > > > > >> controlled.shutdown.enable                      = true
> > > > > >> controlled.shutdown.max.retries                 = 1
> > > > > >> controlled.shutdown.retry.backoff.ms            = 300000
> > > > > >>
> > > > > >> auto.leader.rebalance.enable                    = true
> > > > > >> leader.imbalance.per.broker.percentage          = 10
> > > > > >> leader.imbalance.check.interval.seconds         = 300
> > > > > >>
> > > > > >>
> > > > > >> zookeeper.connect                           = #{ZK_PATH}
> > > > > >> zookeeper.session.timeout.ms                = 30000
> > > > > >> zookeeper.connection.timeout.ms             = 30000
> > > > > >>
> > > > > >>
> > > > > >> Why this might happen?
> > > > > >>
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Alex
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message