kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Rosenberg <...@squareup.com>
Subject Re: Kafka restart takes a long time
Date Sun, 23 Nov 2014 16:47:38 GMT
Rajiv,

So, any time a broker's disk fills up, it will shut itself down immediately
(it will do this in response to any IO error on writing to disk).
Unfortunately, this means that the node will not be able to do any
housecleaning before shutdown, which is an 'unclean' shutdown.  This means
that when it restarts, it needs to reset the data to the last known
checkpoint.  If the partition is replicated, and it can restore it from
another broker, it will try to do that (but it doesn't sound like it can do
that in your case, since all the other nodes are down too).

There is a fix coming in 0.8.2 that will allow a broker to restore multiple
partitions in parallel (but the current behavior in 0.8.1.1 and prior is to
restore partitions 1 by 1). See:
https://issues.apache.org/jira/browse/KAFKA-1414.  This fix should speed
things up greatly when you have a large number of partitions.

If a disk is full, the broker will refuse to even start up (or will fail
immediately on the first write attempt and shut itself down).  So,
generally, in this event, you need to clear some disk space before trying
to restart the server.

The bottom line is that you don't want any of your brokers to run out of
disk space (thus you need to have good monitoring/alerting for advance
warning on this).  Kafka doesn't attempt to detect if it's about to run out
of space and die, so you have to manage that and guard against it outside
of kafka.

Jason

On Sat, Nov 22, 2014 at 5:27 PM, Harsha <kafka@harsha.io> wrote:

> It might logs check your kafka logs dir (server logs) . Kafka can
> produce lot of logs in a quick time make sure thats whats in play here.
> -Harsha
>
> On Sat, Nov 22, 2014, at 01:37 PM, Rajiv Kurian wrote:
> > Actually see a bunch of errors. One of the brokers is out of space and
> > this
> > might be causing everything to spin out of control.
> >
> > Some logs:
> >
> > On *broker 1* (the one that has run out of space):
> >
> > 2014-11-22T21:20:42.790Z FATAL [ReplicaFetcherThread-1-13          ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-1-13], Disk
> > error while replicating data.
> >
> > kafka.common.KafkaStorageException: I/O exception in append to log
> > 'mytopic-633'
> >
> >         at kafka.log.Log.append(Log.scala:283)
> >
> >         at
> >
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >         at
> >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Caused by: java.io.IOException: No space left on device
> >
> >         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> >
> >         at
> >         sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
> >
> >         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> >
> >         at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> >
> >         at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
> >
> >         at
> >
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:132)
> >
> >         at kafka.log.FileMessageSet.append(FileMessageSet.scala:210)
> >
> >         at kafka.log.LogSegment.append(LogSegment.scala:80)
> >
> >         at kafka.log.Log.append(Log.scala:269)
> >
> >         ... 13 more
> >
> > 2014-11-22T21:20:42.791Z ERROR [ReplicaFetcherThread-2-13          ]
> > [kafka.server.ReplicaFetcherThread   ]: [ReplicaFetcherThread-2-13],
> > Error
> > getting offset for partition [myTopic,0] to broker 13
> >
> > java.io.IOException: No space left on device
> >
> >         at java.io.FileOutputStream.writeBytes(Native Method)
> >
> >         at java.io.FileOutputStream.write(FileOutputStream.java:345)
> >
> >         at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> >
> >         at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> >
> >         at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> >
> >         at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
> >
> >         at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
> >
> >         at java.io.BufferedWriter.write(BufferedWriter.java:230)
> >
> >         at java.io.Writer.write(Writer.java:157)
> >
> >         at
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:50)
> >
> >         at
> >
> kafka.server.OffsetCheckpoint$$anonfun$liftedTree1$1$1.apply(OffsetCheckpoint.scala:49)
> >
> >         at
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> >
> >         at
> >
> scala.collection.immutable.MapLike$$anon$2$$anonfun$foreach$3.apply(MapLike.scala:106)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.MapLike$$anon$2.foreach(MapLike.scala:106)
> >
> >         at
> > kafka.server.OffsetCheckpoint.liftedTree1$1(OffsetCheckpoint.scala:49)
> >
> >         at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:39)
> >
> >         at
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:234)
> >
> >         at
> >
> kafka.log.LogManager$$anonfun$checkpointRecoveryPointOffsets$1.apply(LogManager.scala:231)
> >
> >         at
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >
> >         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >
> >         at
> > kafka.log.LogManager.checkpointRecoveryPointOffsets(LogManager.scala:231)
> >
> >         at kafka.log.LogManager.truncateTo(LogManager.scala:204)
> >
> >         at
> >
> kafka.server.ReplicaFetcherThread.handleOffsetOutOfRange(ReplicaFetcherThread.scala:84)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:144)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> >
> >         at kafka.utils.Utils$.inLock(Utils.scala:538)
> >
> >         at
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> >
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >
> >         at
> >         kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> >
> > *On broker 2:*
> >
> > 2014-11-22T21:20:19.629Z ERROR [request-expiration-task            ]
> > [kafka.server.KafkaApis              ]: [KafkaApi-12] Error when
> > processing
> > fetch request for partition [myTopic,265] offset 415659 from follower
> > with
> > correlation id 0
> >
> > kafka.common.OffsetOutOfRangeException: Request for offset 415659 but we
> > only have log segments in the range 0 to 410453.
> >
> >         at kafka.log.Log.read(Log.scala:377)
> >
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:401)
> >
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:347)
> >
> >         at
> >
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:343)
> >
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >
> >         at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> >
> >         at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >
> >         at scala.collection.immutable.HashMap.map(HashMap.scala:35)
> >
> >         at
> >
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:343)
> >
> >         at
> > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:704)
> >
> >         at
> > kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:686)
> >
> >         at
> >
> kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
> >
> >
> > *On broker 3*:
> >
> >
> > 2014-11-22T21:26:48.216Z INFO  [kafka-request-handler-3            ]
> > [fka.controller.PartitionStateMachine]: [Partition state machine on
> > Controller 13]: Invoking state change to OnlinePartition for partitions
> > [myTopic,122]
> >
> > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > [state.change.logger                 ]: Controller 13 epoch 132
> > encountered
> > error while electing leader for partition [myTopic,122] due to: No other
> > replicas in ISR 13 for [myTopic,122] besides shutting down brokers 13.
> >
> > 2014-11-22T21:26:48.218Z ERROR [kafka-request-handler-3            ]
> > [state.change.logger                 ]: Controller 13 epoch 132 initiated
> > state change for partition [myTopic,122] from OnlinePartition to
> > OnlinePartition failed
> >
> > kafka.common.StateChangeFailedException: encountered error while electing
> > leader for partition [myTopic,122] due to: No other replicas in ISR 13
> > for
> > [myTopic,122] besides shutting down brokers 13.
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:344)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:171)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:111)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> >
> >         at scala.collection.immutable.Set$Set1.foreach(Set.scala:81)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:110)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:227)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4$$anonfun$apply$4.apply(KafkaController.scala:223)
> >
> >         at scala.Option.foreach(Option.scala:121)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:223)
> >
> >         at
> >
> kafka.controller.KafkaController$$anonfun$shutdownBroker$4.apply(KafkaController.scala:219)
> >
> >         at
> > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:123)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:322)
> >
> >         at
> >
> kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:219)
> >
> >         at
> >
> kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:140)
> >
> >         at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
> >
> >         at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> >
> >         at java.lang.Thread.run(Thread.java:744)
> >
> > Caused by: kafka.common.StateChangeFailedException: No other replicas in
> > ISR 13 for [myTopic,122] besides shutting down brokers 13
> >
> >         at
> >
> kafka.controller.ControlledShutdownLeaderSelector.selectLeader(PartitionLeaderSelector.scala:181)
> >
> >         at
> >
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:320)
> >
> >         ... 19 more
> >
> > On Sat, Nov 22, 2014 at 1:17 PM, Harsha <kafka@harsha.io> wrote:
> >
> > > Rajiv ,
> > >           which version of kafka are you using and do you see any
> errors
> > >           when the server goes down after sending few messages.
> > > -Harsha
> > >
> > > On Sat, Nov 22, 2014, at 01:05 PM, Rajiv Kurian wrote:
> > > > The brokers also seem unavailable while this is going on.  Each of
> these
> > > > log messages takes 2-3 seconds so at about  1200 partitions it takes
> up
> > > > quite a bit of time. Ultimately it does recover though but sadly it
> goes
> > > > down soon enough after I start sending it messages.
> > > >
> > > > On Sat, Nov 22, 2014 at 11:23 AM, Rajiv Kurian <rajiv@signalfuse.com
> >
> > > > wrote:
> > > >
> > > > > A 3 node kafka broker cluster went down yesterday (all nodes) and
I
> > > just
> > > > > noticed it this morning. When I restarted it this morning, I see
a
> > > lengthy
> > > > > list of messages like this:
> > > > >
> > > > > Loading log 'mytopic-partitionNum"
> > > > > Recovering unflushed segment 'some number' of in log
> > > mytopic-partitionNum.
> > > > > Completed load of log mytopic-partitionNum with log end offset
> > > someOffset
> > > > >
> > > > > It's been going on for more than 30 minutes since I restarted the
> > > broker.
> > > > > I have quite a few partitions (over 1000) but I still wouldn't
> expect
> > > it to
> > > > > take such a long time.
> > > > >
> > > > > Any ideas on how I should investigate the problem?
> > > > >
> > > > > Thanks!
> > > > >
> > >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message