kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjian fan <xiaofanhad...@gmail.com>
Subject Re: error in LogManager.createlog()
Date Wed, 01 Aug 2012 13:43:39 GMT
Jun:

   Can you give more detail of the bytebuffer structure of messages, and
the process of sending and receiving the messages?

Thanks

Jian Fan


2012/7/31 Jun Rao <junrao@gmail.com>

> Jian,
>
> Thanks for the patch. It may not be the right fix though since it fixes
> the symptom, but not the cause. For each produce request, the broker does
> the following: (1) read all bytes of the request into
> a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of the
> request are ready, deserialize the bytes into a ProducerRequest
> (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the request
> by adding topic data to logs.
>
> What you observed is that in step 3, a topic name is corrupted somehow.
> However, this means that the corresponding ProducerRequest is corrupted.
> Assuming there is no corruption at the network layer (very unlikely), the
> corruption much have happened in step 1 or step 2. So, instead of patching
> a corrupted topic name, we should understand why a ProducerRequest can be
> corrupted and fix the cause. BTW, what's caused the corrupted topic could
> be causing the corrupted messages too.
>
> Thanks,
>
> Jun
>
> On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhadoop@gmail.com>
> wrote:
>
> > Jun:
> >
> >   Hi. I find why the error appear. In high cocurrent environment, the tcp
> > server will drop some package when the tcp buffer is over. So there are
> > some chances that "topic" contains one or more characters that encode to
> > bytes that include NULL (0).
> >   I have submit the patch to kafka-411, pls check that!
> >
> > Thanks!
> > Jian Fan
> >
> > 2012/7/30 Jun Rao <junrao@gmail.com>
> >
> > > Jian,
> > >
> > > All log directories in kafka are created by LogManager.createLog(). As
> > you
> > > can see, the directory always has the form of topic-partitionId. So,
> it's
> > > not clear how a directory of "a" can be created in your case. I will
> try
> > to
> > > rerun your test and see if it can be reproduced.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <xiaofanhadoop@gmail.com>
> > > wrote:
> > >
> > > > Jay:
> > > >
> > > >    You can try to send 600 thousand message per second to the broker,
> > you
> > > > can find the tcp will drop packages, so sometimes the topic of ax
> will
> > be
> > > > a. I don't mean to slove the tcp problem from application level, I
> just
> > > > find there are myabe a bug in file.mkdir() of LogManager.createlog.
> It
> > > will
> > > > infect the kafka useage.
> > > >
> > > > Thanks
> > > > Jian Fan
> > > >
> > > > 2012/7/29 Jay Kreps <jay.kreps@gmail.com>
> > > >
> > > > > Hmm, that is not my understanding of TCP. TCP is a reliable
> protocol
> > so
> > > > it
> > > > > is supposed to either deliver packets in order or timeout retrying.
> > In
> > > > the
> > > > > case of the topic name, that is a size-delimited string, there
> should
> > > be
> > > > no
> > > > > way for it to drop a single byte in the middle of the request like
> > > that.
> > > > If
> > > > > that is in fact happening, I don't think it is something we can
> hope
> > to
> > > > > recover from at the application level...
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <
> xiaofanhadoop@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Jun:
> > > > > >    Dropping packages in TCP is an issue of OS/JVM, but it can
> also
> > > > cause
> > > > > > some kafka issue!
> > > > > >    For example, the topic of the message is ax, but it can change
> > to
> > > a
> > > > in
> > > > > > broker because the some packages is drop, so the log directory
> > > > > >    should be like a-0,a-1, a-2 and so on ,but file.mkdir() create
> > log
> > > > > > directory like a. Seems some bugs in file.mkdir() of
> > > > > LogManager.createlog.
> > > > > >    If you shutdown the broker and restart it. The the broker
will
> > > > report
> > > > > > the exception like this:
> > > > > >
> > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a'
> > (kafka.log.LogManager)
> > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during
> > KafkaServerStable
> > > > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> > > > > > java.lang.StringIndexOutOfBoundsException: String index out
of
> > range:
> > > > -1
> > > > > >     at java.lang.String.substring(String.java:1949)
> > > > > >     at kafka.utils.Utils$.getTopicPartition(Utils.scala:558)
> > > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71)
> > > > > >     at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65)
> > > > > >     at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > >     at
> scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > >     at kafka.log.LogManager.<init>(LogManager.scala:65)
> > > > > >     at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
> > > > > >     at
> > > > > >
> > > >
> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
> > > > > >     at kafka.Kafka$.main(Kafka.scala:50)
> > > > > >     at kafka.Kafka.main(Kafka.scala)
> > > > > >
> > > > > >
> > > > > > 2012/7/28 Jun Rao <junrao@gmail.com>
> > > > > >
> > > > > > > Jian,
> > > > > > >
> > > > > > > I am not sure if I understand this completely. Dropping
> packages
> > in
> > > > TCP
> > > > > > > shouldn't cause corruption in the TCP buffer, right? Is
this an
> > > issue
> > > > > in
> > > > > > > Kafka or OS/JVM?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan <
> > > xiaofanhadoop@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Jun:
> > > > > > > > Yes, if the socket server can't handle the package
quickly,
> tcp
> > > > > > protocol
> > > > > > > > will drop some network package until the buffer is
overflow,
> >  the
> > > > > > > corrupted
> > > > > > > > messages is also appear on this situtation!  I run
a
> systemtap
> > > > script
> > > > > > to
> > > > > > > > find the package droping ,also you can type " cat
> > > > /proc/net/sockstat"
> > > > > > to
> > > > > > > > see the tcp memory increase.  I debug the whole kafka
source
> > code
> > > > to
> > > > > > find
> > > > > > > > the bug in file.mkdir() of LogManager.createlog.
> > > > > > > >
> > > > > > > > JIan Fan
> > > > > > > >
> > > > > > > > 2012/7/27 Jun Rao <junrao@gmail.com>
> > > > > > > >
> > > > > > > > > Thanks for the finding. Are you saying that this
problem is
> > > > caused
> > > > > by
> > > > > > > the
> > > > > > > > > buffering in Kafka socket server? How did you
figure that
> > out?
> > > Is
> > > > > > this
> > > > > > > > > problem exposed by the same test that caused
the corrupted
> > > > messages
> > > > > > in
> > > > > > > > the
> > > > > > > > > broker?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan <
> > > > > xiaofanhadoop@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > >     In high cocurrent environment, the tcp
server will
> drop
> > > > some
> > > > > > > > package
> > > > > > > > > > when the tcp buffer is over. Then LogManager.createlog
> will
> > > > > create
> > > > > > > some
> > > > > > > > > > no-exists topic log. But one thing is very
strange, the
> log
> > > > > > directory
> > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but
file.mkdir()
> > > create
> > > > > log
> > > > > > > > > > directory like a. Seems some bug in file.mkdir()
of
> > > > > > > > LogManager.createlog.
> > > > > > > > > >
> > > > > > > > > > the exception message is
> > > > > > > > > >
> > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory
> > > > /data/kafka/axx-0
> > > > > > > > > > (kafka.log.LogManager)
> > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing
> > > > > > MultiProducerRequest
> > > > > > > > on
> > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers)
> > > > > > > > > > java.io.FileNotFoundException:
> > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka
> > > > > > > > > > (Is a directory)
> > > > > > > > > > at java.io.RandomAccessFile.open(Native
Method)
> > > > > > > > > > at
> > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233)
> > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324)
> > > > > > > > > > at
> > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75)
> > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144)
> > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116)
> > > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159)
> > > > > > > > > > at
> > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > > > > > > > > at
> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > > > > > > > > at
> > > > > > > >
> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > > > > > > > > at
> scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > > > > at java.lang.Thread.run(Thread.java:679)
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message