kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 王国栋 <wangg...@gmail.com>
Subject Re: Kafka throw InvalidMessageException and lost data
Date Thu, 21 Mar 2013 04:42:48 GMT
Hi Jun,

We use one thread with one sync produce to send data to broker
(QPS:10k-15k, each log is about 1k bytes). The problem is reproduced.

We have used Producer and SyncProducer in our test. The same Exception
appears.

Thanks.



On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <junrao@gmail.com> wrote:

> How many threads are you using?
>
> Thanks,
>
> Jun
>
> On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <zhou.yang.here@gmail.com>
> wrote:
>
> > Sorry, I made a mistake, we use many threads producing at same time.
> >
> >
> > 2013/3/20 Jun Rao <junrao@gmail.com>
> >
> > > How many producer instances do you have? Can you reproduce the problem
> > with
> > > a single producer?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <wanggd04@gmail.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > we do not use any compression in our test.
> > > >
> > > > We deploy producer and broker in the same machine. The problem still
> > > > exists. We use sync producer, and send one message at a time(no batch
> > > now).
> > > > We find that when the qps reaches more than 40k, the exception
> appears.
> > > So
> > > > I don't think it's the underlying system error.
> > > >
> > > > Any suggestions if we want to do some debug on kafka
> > > > serialization/deserialization?
> > > >
> > > > Thanks.
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <junrao@gmail.com> wrote:
> > > >
> > > > > It basically means that the broker is expecting to read certain
> > number
> > > of
> > > > > bytes in a buffer received from socket, but there are fewer bytes
> > than
> > > > > expected in the buffer. Possible causes are (1) a bug in Kafka
> > request
> > > > > serialization/deserialization logic; (2) corruption in the
> underlying
> > > > > system such as network.
> > > > >
> > > > > BTW, did you enable compression in your producer?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <xkeegan@gmail.com>
> > > wrote:
> > > > >
> > > > > > thanks Jun.
> > > > > >
> > > > > > we are using java producer.
> > > > > > does the last exception
> > > > > > "java.lang.IllegalArgumentException
> > > > > >     at java.nio.Buffer.limit(Buffer.java:266)
> > > > > > "
> > > > > > also means the broker received corrupted messages ?  sorry i
am
> not
> > > > > > familiar with java nio.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <junrao@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hmm, both log4j messages suggest that the broker received
some
> > > > > corrupted
> > > > > > > produce requests. Are you using the java producer? Also,
we
> have
> > > seen
> > > > > > that
> > > > > > > network router problems caused corrupted requests before.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <
> xkeegan@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > We were doing some performance test using kafka 0.7.2.
We use
> > > only
> > > > 1
> > > > > > > > broker.
> > > > > > > > On producer client, we use 8 threads to send logs,
each
> thread
> > > use
> > > > > sync
> > > > > > > > producer and send 100 logs at a time, (each log is
about 1~2K
> > > bytes
> > > > > > > long),
> > > > > > > > The total QPS is about 30K.
> > > > > > > > But the number of logs both consumer read and the
broker
> counts
> > > is
> > > > > less
> > > > > > > > than the producer send. we believe the data lost when
> producer
> > > > > sending
> > > > > > > logs
> > > > > > > > to broker.
> > > > > > > >
> > > > > > > > We settle the QPS down to 10K, still lost logs.
> > > > > > > > We found some exceptions in broker logs:
> > > > > > > >
> > > > > > > > 9201051 [kafka-processor-2] ERROR
> > > kafka.server.KafkaRequestHandlers
> > > > >  -
> > > > > > > > Error processing ProduceRequest on abc:0
> > > > > > > > kafka.message.InvalidMessageException: message is
invalid,
> > > > > compression
> > > > > > > > codec: NoCompressionCodec size: 1021 curr offset:
0 init
> > offset:
> > > 0
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > > > > >     at
> > > > > > > >
> > > > > >
> > > >
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > > > > >     at
> > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> > > > > > > >     at kafka.log.Log.append(Log.scala:218)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> > > > > > > >     at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > >     at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > >     at java.lang.Thread.run(Thread.java:636)
> > > > > > > >
> > > > > > > > Or this:
> > > > > > > >
> > > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor
 -
> > > > Closing
> > > > > > > socket
> > > > > > > > for /10.0.2.140 because of error
> > > > > > > > java.nio.BufferUnderflowException
> > > > > > > >     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
> > > > > > > >     at java.nio.ByteBuffer.get(ByteBuffer.java:692)
> > > > > > > >     at kafka.utils.Utils$.readShortString(Utils.scala:123)
> > > > > > > >     at
> > > > kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> > > > > > > >     at
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> > > > > > > >     at
> > > > > > > >
> > > > >
> > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
> > > > > > > >
> > > > > > > > Or this:
> > > > > > > >
> > > > > > > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor
 -
> > > > Closing
> > > > > > > socket
> > > > > > > > for /10.0.2.140 because of error
> > > > > > > > java.lang.IllegalArgumentException
> > > > > > > >     at java.nio.Buffer.limit(Buffer.java:266)
> > > > > > > >     at
> > > > kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> > > > > > > >     at
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> > > > > > > >     at
> > > > > > > >
> > > > >
> > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > >     at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > > > > > >     at kafka.network.Processor.handle(SocketServer.scala:296)
> > > > > > > >     at kafka.network.Processor.read(SocketServer.scala:319)
> > > > > > > >     at kafka.network.Processor.run(SocketServer.scala:214)
> > > > > > > >     at java.lang.Thread.run(Thread.java:636)
> > > > > > > >
> > > > > > > > It bothers us for a few days, and at first we thought
it
> might
> > be
> > > > > some
> > > > > > > > wrong configuration settings, and we changed to the
wiki's
> > > > > recommended
> > > > > > > > configuration, but unfortunately the exceptions still
came
> out.
> > > > > > > >
> > > > > > > > In what situation can these exceptions  be thrown
out ? What
> > can
> > > we
> > > > > do
> > > > > > to
> > > > > > > > avoid these exceptions ?
> > > > > > > >
> > > > > > > > THANKS
> > > > > > > >
> > > > > > > > --
> > > > > > > > *Best Regards
> > > > > > > >
> > > > > > > > Xiang Helin*
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > *Best Regards
> > > > > >
> > > > > > 向河林*
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Guodong Wang
> > > > 王国栋
> > > >
> > >
> >
> >
> >
> > --
> >
> > Yang Zhou(周阳)
> >
> > Department of Computer Science and Engineering
> > Shanghai Jiao Tong University
> >
>



-- 
Guodong Wang
王国栋

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message