kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjian fan <xiaofanhad...@gmail.com>
Subject Re: error in LogManager.createlog()
Date Thu, 09 Aug 2012 04:03:58 GMT
Jun:

    I have locate the problem. It was cause by cisio router. In high load
scenario, our cisio router(2960s) will drop some packages by its low
ability. So socket.recv() should be fine,  we just need to solve the log
directory corrupted by topic name with null byte in this scenario.

Jian Fan

2012/8/7 Jun Rao <junrao@gmail.com>

> Thanks for the pointer to the paper. However, the socket buffer overflow
> issue mentioned in the paper seems to be a performance issue, not a
> correctness issue. That is, whatever bytes socket.recv() get should not be
> corrupted. Is this not true?
>
> Jun
>
> On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xiaofanhadoop@gmail.com> wrote:
>
> > The exception reason may be tcp buffer overflow, pls check the paper
> >
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
> >
> > Thanks!
> >
> > 2012/8/2 jjian fan <xiaofanhadoop@gmail.com>
> >
> > > Jun:
> > >
> > >     How about the server power of the broker, you can deploy more
> > producer
> > > clients to increase the borker pressure. In my test, we send 300
> thousand
> > > messages per second to the broker, the message size is 1024. In this
> > > scenario, these exceptions are often be seen.
> > >
> > > Thanks!
> > > Jian Fan
> > >
> > > 2012/8/1 Jun Rao <junrao@gmail.com>
> > >
> > >> Jian,
> > >>
> > >> The message format is documented in the Message class and has the
> > >> following
> > >> format.
> > >>
> > >> /**
> > >>  * A message. The format of an N byte message is the following:
> > >>  *
> > >>  * If magic byte is 0
> > >>  *
> > >>  * 1. 1 byte "magic" identifier to allow format changes
> > >>  *
> > >>  * 2. 4 byte CRC32 of the payload
> > >>  *
> > >>  * 3. N - 5 byte payload
> > >>  *
> > >>  * If magic byte is 1
> > >>  *
> > >>  * 1. 1 byte "magic" identifier to allow format changes
> > >>  *
> > >>  * 2. 1 byte "attributes" identifier to allow annotations on the
> message
> > >> independent of the version (e.g. compression enabled, type of codec
> > used)
> > >>  *
> > >>  * 3. 4 byte CRC32 of the payload
> > >>  *
> > >>  * 4. N - 6 byte payload
> > >>  *
> > >>  */
> > >>
> > >> The flow is the following:
> > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and
> sends
> > >> the bytes to socket.
> > >> 2. On the server side:
> > >> 2.1 Processor.read reads the bytes off socket and deserializes the
> bytes
> > >> into a MultiProduceRequest
> > >> 2.2 The request is then handled in KafkaRequestHandler
> > >>
> > >> BTW, I ran your test for a couple of days, but couldn't reproduce the
> > >> exception. In your test, how frequently do you see the exceptions?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhadoop@gmail.com>
> > >> wrote:
> > >>
> > >> > Jun:
> > >> >
> > >> >    Can you give more detail of the bytebuffer structure of messages,
> > and
> > >> > the process of sending and receiving the messages?
> > >> >
> > >> > Thanks
> > >> >
> > >> > Jian Fan
> > >> >
> > >> >
> > >> > 2012/7/31 Jun Rao <junrao@gmail.com>
> > >> >
> > >> > > Jian,
> > >> > >
> > >> > > Thanks for the patch. It may not be the right fix though since
it
> > >> fixes
> > >> > > the symptom, but not the cause. For each produce request, the
> broker
> > >> does
> > >> > > the following: (1) read all bytes of the request into
> > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all
> bytes
> > of
> > >> > the
> > >> > > request are ready, deserialize the bytes into a ProducerRequest
> > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve
> the
> > >> > request
> > >> > > by adding topic data to logs.
> > >> > >
> > >> > > What you observed is that in step 3, a topic name is corrupted
> > >> somehow.
> > >> > > However, this means that the corresponding ProducerRequest is
> > >> corrupted.
> > >> > > Assuming there is no corruption at the network layer (very
> > unlikely),
> > >> the
> > >> > > corruption much have happened in step 1 or step 2. So, instead
of
> > >> > patching
> > >> > > a corrupted topic name, we should understand why a ProducerRequest
> > >> can be
> > >> > > corrupted and fix the cause. BTW, what's caused the corrupted
> topic
> > >> could
> > >> > > be causing the corrupted messages too.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <
> xiaofanhadoop@gmail.com
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Jun:
> > >> > > >
> > >> > > >   Hi. I find why the error appear. In high cocurrent
> environment,
> > >> the
> > >> > tcp
> > >> > > > server will drop some package when the tcp buffer is over.
So
> > there
> > >> are
> > >> > > > some chances that "topic" contains one or more characters
that
> > >> encode
> > >> > to
> > >> > > > bytes that include NULL (0).
> > >> > > >   I have submit the patch to kafka-411, pls check that!
> > >> > > >
> > >> > > > Thanks!
> > >> > > > Jian Fan
> > >> > > >
> > >> > > > 2012/7/30 Jun Rao <junrao@gmail.com>
> > >> > > >
> > >> > > > > Jian,
> > >> > > > >
> > >> > > > > All log directories in kafka are created by
> > >> LogManager.createLog().
> > >> > As
> > >> > > > you
> > >> > > > > can see, the directory always has the form of
> topic-partitionId.
> > >> So,
> > >> > > it's
> > >> > > > > not clear how a directory of "a" can be created in
your case.
> I
> > >> will
> > >> > > try
> > >> > > > to
> > >> > > > > rerun your test and see if it can be reproduced.
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > >
> > >> > > > > Jun
> > >> > > > >
> > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> > >> xiaofanhadoop@gmail.com>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Jay:
> > >> > > > > >
> > >> > > > > >    You can try to send 600 thousand message per
second to
> the
> > >> > broker,
> > >> > > > you
> > >> > > > > > can find the tcp will drop packages, so sometimes
the topic
> of
> > >> ax
> > >> > > will
> > >> > > > be
> > >> > > > > > a. I don't mean to slove the tcp problem from
application
> > >> level, I
> > >> > > just
> > >> > > > > > find there are myabe a bug in file.mkdir() of
> > >> LogManager.createlog.
> > >> > > It
> > >> > > > > will
> > >> > > > > > infect the kafka useage.
> > >> > > > > >
> > >> > > > > > Thanks
> > >> > > > > > Jian Fan
> > >> > > > > >
> > >> > > > > > 2012/7/29 Jay Kreps <jay.kreps@gmail.com>
> > >> > > > > >
> > >> > > > > > > Hmm, that is not my understanding of TCP.
TCP is a
> reliable
> > >> > > protocol
> > >> > > > so
> > >> > > > > > it
> > >> > > > > > > is supposed to either deliver packets in
order or timeout
> > >> > retrying.
> > >> > > > In
> > >> > > > > > the
> > >> > > > > > > case of the topic name, that is a size-delimited
string,
> > there
> > >> > > should
> > >> > > > > be
> > >> > > > > > no
> > >> > > > > > > way for it to drop a single byte in the middle
of the
> > request
> > >> > like
> > >> > > > > that.
> > >> > > > > > If
> > >> > > > > > > that is in fact happening, I don't think
it is something
> we
> > >> can
> > >> > > hope
> > >> > > > to
> > >> > > > > > > recover from at the application level...
> > >> > > > > > >
> > >> > > > > > > -Jay
> > >> > > > > > >
> > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan
<
> > >> > > xiaofanhadoop@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > Jun:
> > >> > > > > > > >    Dropping packages in TCP is an issue
of OS/JVM, but
> it
> > >> can
> > >> > > also
> > >> > > > > > cause
> > >> > > > > > > > some kafka issue!
> > >> > > > > > > >    For example, the topic of the message
is ax, but it
> can
> > >> > change
> > >> > > > to
> > >> > > > > a
> > >> > > > > > in
> > >> > > > > > > > broker because the some packages is
drop, so the log
> > >> directory
> > >> > > > > > > >    should be like a-0,a-1, a-2 and so
on ,but
> file.mkdir()
> > >> > create
> > >> > > > log
> > >> > > > > > > > directory like a. Seems some bugs in
file.mkdir() of
> > >> > > > > > > LogManager.createlog.
> > >> > > > > > > >    If you shutdown the broker and restart
it. The the
> > broker
> > >> > will
> > >> > > > > > report
> > >> > > > > > > > the exception like this:
> > >> > > > > > > >
> > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading
log 'a'
> > >> > > > (kafka.log.LogManager)
> > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal
error during
> > >> > > > KafkaServerStable
> > >> > > > > > > > startup. Prepare to shutdown
> > >> > (kafka.server.KafkaServerStartable)
> > >> > > > > > > > java.lang.StringIndexOutOfBoundsException:
String index
> > out
> > >> of
> > >> > > > range:
> > >> > > > > > -1
> > >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> > >> > > > > > > >     at
> > kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > >> > > > > > > >     at
> > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > >> > > > > > > >     at
> > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > >> > > > > > > >     at
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >> > > > > > > >     at
> > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > >> > > > > > > >     at
> > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > >> > > > > > > >     at
> > >> > > > > > > >
> > >> > > > > >
> > >> > > >
> > >> >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > 2012/7/28 Jun Rao <junrao@gmail.com>
> > >> > > > > > > >
> > >> > > > > > > > > Jian,
> > >> > > > > > > > >
> > >> > > > > > > > > I am not sure if I understand this
completely.
> Dropping
> > >> > > packages
> > >> > > > in
> > >> > > > > > TCP
> > >> > > > > > > > > shouldn't cause corruption in the
TCP buffer, right?
> Is
> > >> this
> > >> > an
> > >> > > > > issue
> > >> > > > > > > in
> > >> > > > > > > > > Kafka or OS/JVM?
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks,
> > >> > > > > > > > >
> > >> > > > > > > > > Jun
> > >> > > > > > > > >
> > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM,
jjian fan <
> > >> > > > > xiaofanhadoop@gmail.com>
> > >> > > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Jun:
> > >> > > > > > > > > > Yes, if the socket server
can't handle the package
> > >> quickly,
> > >> > > tcp
> > >> > > > > > > > protocol
> > >> > > > > > > > > > will drop some network package
until the buffer is
> > >> > overflow,
> > >> > > >  the
> > >> > > > > > > > > corrupted
> > >> > > > > > > > > > messages is also appear on
this situtation!  I run a
> > >> > > systemtap
> > >> > > > > > script
> > >> > > > > > > > to
> > >> > > > > > > > > > find the package droping ,also
you can type " cat
> > >> > > > > > /proc/net/sockstat"
> > >> > > > > > > > to
> > >> > > > > > > > > > see the tcp memory increase.
 I debug the whole
> kafka
> > >> > source
> > >> > > > code
> > >> > > > > > to
> > >> > > > > > > > find
> > >> > > > > > > > > > the bug in file.mkdir() of
LogManager.createlog.
> > >> > > > > > > > > >
> > >> > > > > > > > > > JIan Fan
> > >> > > > > > > > > >
> > >> > > > > > > > > > 2012/7/27 Jun Rao <junrao@gmail.com>
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Thanks for the finding.
Are you saying that this
> > >> problem
> > >> > is
> > >> > > > > > caused
> > >> > > > > > > by
> > >> > > > > > > > > the
> > >> > > > > > > > > > > buffering in Kafka socket
server? How did you
> figure
> > >> that
> > >> > > > out?
> > >> > > > > Is
> > >> > > > > > > > this
> > >> > > > > > > > > > > problem exposed by the
same test that caused the
> > >> > corrupted
> > >> > > > > > messages
> > >> > > > > > > > in
> > >> > > > > > > > > > the
> > >> > > > > > > > > > > broker?
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Thanks,
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Jun
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > On Fri, Jul 27, 2012
at 2:16 AM, jjian fan <
> > >> > > > > > > xiaofanhadoop@gmail.com>
> > >> > > > > > > > > > > wrote:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > >     In high cocurrent
environment, the tcp
> server
> > >> will
> > >> > > drop
> > >> > > > > > some
> > >> > > > > > > > > > package
> > >> > > > > > > > > > > > when the tcp buffer
is over. Then
> > >> LogManager.createlog
> > >> > > will
> > >> > > > > > > create
> > >> > > > > > > > > some
> > >> > > > > > > > > > > > no-exists topic
log. But one thing is very
> > strange,
> > >> the
> > >> > > log
> > >> > > > > > > > directory
> > >> > > > > > > > > > > > should be like a-0,a-1,
a-2 and so on ,but
> > >> file.mkdir()
> > >> > > > > create
> > >> > > > > > > log
> > >> > > > > > > > > > > > directory like a.
Seems some bug in file.mkdir()
> > of
> > >> > > > > > > > > > LogManager.createlog.
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > the exception message
is
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > > [2012-07-27 17:08:00,559]
INFO create directory
> > >> > > > > > /data/kafka/axx-0
> > >> > > > > > > > > > > > (kafka.log.LogManager)
> > >> > > > > > > > > > > > [2012-07-27 17:08:00,561]
ERROR Error processing
> > >> > > > > > > > MultiProducerRequest
> > >> > > > > > > > > > on
> > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > >> > > > > > > > > > > > java.io.FileNotFoundException:
> > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > >> > > > > > > > > > > > (Is a directory)
> > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native
Method)
> > >> > > > > > > > > > > > at
> > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > >> > > > > > > > > > > > at
> kafka.utils.Utils$.openChannel(Utils.scala:324)
> > >> > > > > > > > > > > > at
> > >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > >> > > > > > > > > > > > at
> > >> kafka.log.LogManager.createLog(LogManager.scala:159)
> > >> > > > > > > > > > > > at
> > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > >
> > >> > > > > >
> > >> >
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >> > > > > > > > > > > > at
> > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > >> > > > > > > > > > > > at
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > >> > > > > > > > > > > > at
> > >> > kafka.network.Processor.handle(SocketServer.scala:296)
> > >> > > > > > > > > > > > at
> > >> kafka.network.Processor.read(SocketServer.scala:319)
> > >> > > > > > > > > > > > at
> > >> kafka.network.Processor.run(SocketServer.scala:214)
> > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > >> > > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message