kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: error in LogManager.createlog()
Date Mon, 06 Aug 2012 16:53:35 GMT
Thanks for the pointer to the paper. However, the socket buffer overflow
issue mentioned in the paper seems to be a performance issue, not a
correctness issue. That is, whatever bytes socket.recv() get should not be
corrupted. Is this not true?

Jun

On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xiaofanhadoop@gmail.com> wrote:

> The exception reason may be tcp buffer overflow, pls check the paper
> http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
>
> Thanks!
>
> 2012/8/2 jjian fan <xiaofanhadoop@gmail.com>
>
> > Jun:
> >
> >     How about the server power of the broker, you can deploy more
> producer
> > clients to increase the borker pressure. In my test, we send 300 thousand
> > messages per second to the broker, the message size is 1024. In this
> > scenario, these exceptions are often be seen.
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/8/1 Jun Rao <junrao@gmail.com>
> >
> >> Jian,
> >>
> >> The message format is documented in the Message class and has the
> >> following
> >> format.
> >>
> >> /**
> >>  * A message. The format of an N byte message is the following:
> >>  *
> >>  * If magic byte is 0
> >>  *
> >>  * 1. 1 byte "magic" identifier to allow format changes
> >>  *
> >>  * 2. 4 byte CRC32 of the payload
> >>  *
> >>  * 3. N - 5 byte payload
> >>  *
> >>  * If magic byte is 1
> >>  *
> >>  * 1. 1 byte "magic" identifier to allow format changes
> >>  *
> >>  * 2. 1 byte "attributes" identifier to allow annotations on the message
> >> independent of the version (e.g. compression enabled, type of codec
> used)
> >>  *
> >>  * 3. 4 byte CRC32 of the payload
> >>  *
> >>  * 4. N - 6 byte payload
> >>  *
> >>  */
> >>
> >> The flow is the following:
> >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends
> >> the bytes to socket.
> >> 2. On the server side:
> >> 2.1 Processor.read reads the bytes off socket and deserializes the bytes
> >> into a MultiProduceRequest
> >> 2.2 The request is then handled in KafkaRequestHandler
> >>
> >> BTW, I ran your test for a couple of days, but couldn't reproduce the
> >> exception. In your test, how frequently do you see the exceptions?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhadoop@gmail.com>
> >> wrote:
> >>
> >> > Jun:
> >> >
> >> >    Can you give more detail of the bytebuffer structure of messages,
> and
> >> > the process of sending and receiving the messages?
> >> >
> >> > Thanks
> >> >
> >> > Jian Fan
> >> >
> >> >
> >> > 2012/7/31 Jun Rao <junrao@gmail.com>
> >> >
> >> > > Jian,
> >> > >
> >> > > Thanks for the patch. It may not be the right fix though since it
> >> fixes
> >> > > the symptom, but not the cause. For each produce request, the broker
> >> does
> >> > > the following: (1) read all bytes of the request into
> >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes
> of
> >> > the
> >> > > request are ready, deserialize the bytes into a ProducerRequest
> >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the
> >> > request
> >> > > by adding topic data to logs.
> >> > >
> >> > > What you observed is that in step 3, a topic name is corrupted
> >> somehow.
> >> > > However, this means that the corresponding ProducerRequest is
> >> corrupted.
> >> > > Assuming there is no corruption at the network layer (very
> unlikely),
> >> the
> >> > > corruption much have happened in step 1 or step 2. So, instead of
> >> > patching
> >> > > a corrupted topic name, we should understand why a ProducerRequest
> >> can be
> >> > > corrupted and fix the cause. BTW, what's caused the corrupted topic
> >> could
> >> > > be causing the corrupted messages too.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhadoop@gmail.com
> >
> >> > > wrote:
> >> > >
> >> > > > Jun:
> >> > > >
> >> > > >   Hi. I find why the error appear. In high cocurrent environment,
> >> the
> >> > tcp
> >> > > > server will drop some package when the tcp buffer is over. So
> there
> >> are
> >> > > > some chances that "topic" contains one or more characters that
> >> encode
> >> > to
> >> > > > bytes that include NULL (0).
> >> > > >   I have submit the patch to kafka-411, pls check that!
> >> > > >
> >> > > > Thanks!
> >> > > > Jian Fan
> >> > > >
> >> > > > 2012/7/30 Jun Rao <junrao@gmail.com>
> >> > > >
> >> > > > > Jian,
> >> > > > >
> >> > > > > All log directories in kafka are created by
> >> LogManager.createLog().
> >> > As
> >> > > > you
> >> > > > > can see, the directory always has the form of topic-partitionId.
> >> So,
> >> > > it's
> >> > > > > not clear how a directory of "a" can be created in your
case. I
> >> will
> >> > > try
> >> > > > to
> >> > > > > rerun your test and see if it can be reproduced.
> >> > > > >
> >> > > > > Thanks,
> >> > > > >
> >> > > > > Jun
> >> > > > >
> >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <
> >> xiaofanhadoop@gmail.com>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Jay:
> >> > > > > >
> >> > > > > >    You can try to send 600 thousand message per second
to the
> >> > broker,
> >> > > > you
> >> > > > > > can find the tcp will drop packages, so sometimes the
topic of
> >> ax
> >> > > will
> >> > > > be
> >> > > > > > a. I don't mean to slove the tcp problem from application
> >> level, I
> >> > > just
> >> > > > > > find there are myabe a bug in file.mkdir() of
> >> LogManager.createlog.
> >> > > It
> >> > > > > will
> >> > > > > > infect the kafka useage.
> >> > > > > >
> >> > > > > > Thanks
> >> > > > > > Jian Fan
> >> > > > > >
> >> > > > > > 2012/7/29 Jay Kreps <jay.kreps@gmail.com>
> >> > > > > >
> >> > > > > > > Hmm, that is not my understanding of TCP. TCP
is a reliable
> >> > > protocol
> >> > > > so
> >> > > > > > it
> >> > > > > > > is supposed to either deliver packets in order
or timeout
> >> > retrying.
> >> > > > In
> >> > > > > > the
> >> > > > > > > case of the topic name, that is a size-delimited
string,
> there
> >> > > should
> >> > > > > be
> >> > > > > > no
> >> > > > > > > way for it to drop a single byte in the middle
of the
> request
> >> > like
> >> > > > > that.
> >> > > > > > If
> >> > > > > > > that is in fact happening, I don't think it is
something we
> >> can
> >> > > hope
> >> > > > to
> >> > > > > > > recover from at the application level...
> >> > > > > > >
> >> > > > > > > -Jay
> >> > > > > > >
> >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> >> > > xiaofanhadoop@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Jun:
> >> > > > > > > >    Dropping packages in TCP is an issue of
OS/JVM, but it
> >> can
> >> > > also
> >> > > > > > cause
> >> > > > > > > > some kafka issue!
> >> > > > > > > >    For example, the topic of the message
is ax, but it can
> >> > change
> >> > > > to
> >> > > > > a
> >> > > > > > in
> >> > > > > > > > broker because the some packages is drop,
so the log
> >> directory
> >> > > > > > > >    should be like a-0,a-1, a-2 and so on
,but file.mkdir()
> >> > create
> >> > > > log
> >> > > > > > > > directory like a. Seems some bugs in file.mkdir()
of
> >> > > > > > > LogManager.createlog.
> >> > > > > > > >    If you shutdown the broker and restart
it. The the
> broker
> >> > will
> >> > > > > > report
> >> > > > > > > > the exception like this:
> >> > > > > > > >
> >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log
'a'
> >> > > > (kafka.log.LogManager)
> >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error
during
> >> > > > KafkaServerStable
> >> > > > > > > > startup. Prepare to shutdown
> >> > (kafka.server.KafkaServerStartable)
> >> > > > > > > > java.lang.StringIndexOutOfBoundsException:
String index
> out
> >> of
> >> > > > range:
> >> > > > > > -1
> >> > > > > > > >     at java.lang.String.substring(String.java:1949)
> >> > > > > > > >     at
> kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> >> > > > > > > >     at
> >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> >> > > > > > > >     at
> >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> >> > > > > > > >     at
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >> > > > > > > >     at
> >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >> > > > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> >> > > > > > > >     at
> >> kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> >> > > > > > > >     at
> >> > > > > > > >
> >> > > > > >
> >> > > >
> >> >
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> >> > > > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> >> > > > > > > >     at kafka.Kafka.main(Kafka.scala)
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > 2012/7/28 Jun Rao <junrao@gmail.com>
> >> > > > > > > >
> >> > > > > > > > > Jian,
> >> > > > > > > > >
> >> > > > > > > > > I am not sure if I understand this completely.
Dropping
> >> > > packages
> >> > > > in
> >> > > > > > TCP
> >> > > > > > > > > shouldn't cause corruption in the TCP
buffer, right? Is
> >> this
> >> > an
> >> > > > > issue
> >> > > > > > > in
> >> > > > > > > > > Kafka or OS/JVM?
> >> > > > > > > > >
> >> > > > > > > > > Thanks,
> >> > > > > > > > >
> >> > > > > > > > > Jun
> >> > > > > > > > >
> >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian
fan <
> >> > > > > xiaofanhadoop@gmail.com>
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Jun:
> >> > > > > > > > > > Yes, if the socket server can't
handle the package
> >> quickly,
> >> > > tcp
> >> > > > > > > > protocol
> >> > > > > > > > > > will drop some network package
until the buffer is
> >> > overflow,
> >> > > >  the
> >> > > > > > > > > corrupted
> >> > > > > > > > > > messages is also appear on this
situtation!  I run a
> >> > > systemtap
> >> > > > > > script
> >> > > > > > > > to
> >> > > > > > > > > > find the package droping ,also
you can type " cat
> >> > > > > > /proc/net/sockstat"
> >> > > > > > > > to
> >> > > > > > > > > > see the tcp memory increase.  I
debug the whole kafka
> >> > source
> >> > > > code
> >> > > > > > to
> >> > > > > > > > find
> >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> >> > > > > > > > > >
> >> > > > > > > > > > JIan Fan
> >> > > > > > > > > >
> >> > > > > > > > > > 2012/7/27 Jun Rao <junrao@gmail.com>
> >> > > > > > > > > >
> >> > > > > > > > > > > Thanks for the finding. Are
you saying that this
> >> problem
> >> > is
> >> > > > > > caused
> >> > > > > > > by
> >> > > > > > > > > the
> >> > > > > > > > > > > buffering in Kafka socket
server? How did you figure
> >> that
> >> > > > out?
> >> > > > > Is
> >> > > > > > > > this
> >> > > > > > > > > > > problem exposed by the same
test that caused the
> >> > corrupted
> >> > > > > > messages
> >> > > > > > > > in
> >> > > > > > > > > > the
> >> > > > > > > > > > > broker?
> >> > > > > > > > > > >
> >> > > > > > > > > > > Thanks,
> >> > > > > > > > > > >
> >> > > > > > > > > > > Jun
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16
AM, jjian fan <
> >> > > > > > > xiaofanhadoop@gmail.com>
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > >     In high cocurrent
environment, the tcp server
> >> will
> >> > > drop
> >> > > > > > some
> >> > > > > > > > > > package
> >> > > > > > > > > > > > when the tcp buffer is
over. Then
> >> LogManager.createlog
> >> > > will
> >> > > > > > > create
> >> > > > > > > > > some
> >> > > > > > > > > > > > no-exists topic log.
But one thing is very
> strange,
> >> the
> >> > > log
> >> > > > > > > > directory
> >> > > > > > > > > > > > should be like a-0,a-1,
a-2 and so on ,but
> >> file.mkdir()
> >> > > > > create
> >> > > > > > > log
> >> > > > > > > > > > > > directory like a. Seems
some bug in file.mkdir()
> of
> >> > > > > > > > > > LogManager.createlog.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > the exception message
is
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > [2012-07-27 17:08:00,559]
INFO create directory
> >> > > > > > /data/kafka/axx-0
> >> > > > > > > > > > > > (kafka.log.LogManager)
> >> > > > > > > > > > > > [2012-07-27 17:08:00,561]
ERROR Error processing
> >> > > > > > > > MultiProducerRequest
> >> > > > > > > > > > on
> >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> >> > > > > > > > > > > > java.io.FileNotFoundException:
> >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> >> > > > > > > > > > > > (Is a directory)
> >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native
Method)
> >> > > > > > > > > > > > at
> >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> >> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> >> > > > > > > > > > > > at
> >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> >> > > > > > > > > > > > at
> >> kafka.log.LogManager.createLog(LogManager.scala:159)
> >> > > > > > > > > > > > at
> >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > > > > > >
> >> > > > > >
> >> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >> > > > > > > > > > > > at
> >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> >> > > > > > > > > > > > at
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> >> > > > > > > > > > > > at
> >> > kafka.network.Processor.handle(SocketServer.scala:296)
> >> > > > > > > > > > > > at
> >> kafka.network.Processor.read(SocketServer.scala:319)
> >> > > > > > > > > > > > at
> >> kafka.network.Processor.run(SocketServer.scala:214)
> >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message