kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: error with kafka
Date Fri, 13 Jul 2012 17:15:52 GMT
Jian,

Thanks. We will take a look.

Another thing. Could you try enabling debug level logging in
kafka.producer.SyncProducer while running your test? This will enable
message verification on the producer side and will tell us if the
corruption was introduced on the producer side or the broker side.

Thanks,

Jun

On Thu, Jul 12, 2012 at 6:51 PM, jjian fan <xiaofanhadoop@gmail.com> wrote:

> I post my code here:
>
> ProducerThread.java
> package com.tz.kafka;
>
>
> import java.io.Serializable;
> import java.util.Properties;
> import kafka.producer.ProducerConfig;
> import kafka.javaapi.producer.*;
> import java.util.*;
> import java.util.concurrent.CopyOnWriteArrayList;
>
> public class ProducerThread implements Runnable ,Serializable
> {
>   /**
>  *
>  */
> private static final long serialVersionUID = 18977854555656L;
> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>   private String topic;
>   private Properties props = new Properties();
>       private String messageStr;
>   public  ProducerThread(String kafkatopic,String message)
>   {
>     synchronized(this){
>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
> 192.168.75.65:2181");
> //props.put("broker.list", "4:192.168.75.104:9092");
> //props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
> props.put("compression.codec", "1");
> props.put("batch.size", "5");
> props.put("queue.enqueueTimeout.ms", "-1");
> props.put("queue.size", "2000");
> props.put("buffer.size", "10240000");
> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
> props.put("zk.sessiontimeout.ms", "6000000");
> props.put("zk.connectiontimeout.ms", "6000000");
> props.put("socket.timeout.ms", "60000000");
> props.put("connect.timeout.ms", "60000000");
> props.put("max.message.size", "20000");
> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
> props.put("reconnect.interval.ms", "3000");
>     // Use random partitioner. Don't need the key type. Just set it to
> Integer.
>     // The message is of type String.
> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
> ProducerConfig(props));
>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
> ProducerConfig(props));
>     this.topic = kafkatopic;
>     this.messageStr = message;
>
>   }
>   }
>
>   public void run() {
> synchronized(this) {
> Producer<String, String> producer  = new Producer<String, String>(new
> ProducerConfig(props));
>     //producer.
> long messageNo = 0;
>     long t = System.currentTimeMillis();
>     long r = System.currentTimeMillis();
>     long time = r-t;
>     long rate = 0;
>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>     while(true)
>     {
>       if(topic.length() > 0 )
>       {
>      messageSet.add(this.messageStr.toString());
>          ProducerData<String, String> data = new ProducerData<String,
> String>(topic,null,messageSet);
>
>          producer.send(data);
>          messageSet.clear();
>          data = null;
>          messageNo++;
>
>       }
>
>       if(messageNo % 200000 ==0)
>       {
>       r = System.currentTimeMillis();
>       time = r-t;
>       rate = 200000000/time;
>       System.out.println(this.topic + " send message per second:"+rate);
>       t = r;
>       }
>
>      }
> }
>   }
>     }
>
> ProducerThreadTest1.java
>
> package com.tz.kafka;
>
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ProducerThreadTest1 {
>
> /**
>  * @param args
>  * @throws InterruptedException
>  */
> public static void main(String[] args) throws InterruptedException {
> // TODO Auto-generated method stub
> int i = Integer.parseInt(args[0]);
>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
> new ThreadPoolExecutor.DiscardOldestPolicy());
> int messageSize = Integer.parseInt(args[1]);
>  StringBuffer messageStr = new StringBuffer();
> for(int messagesize=0;messagesize<messageSize;messagesize++)
>      {
>      messageStr.append("X");
>      }
> String topic = args[2];
> for(int j=0;j < i; j++)
> {
>    topic += "x";
>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>    Thread.sleep(1000);
>
> }
> }
>
> }
>
>
> the shell scripte kafkaThreadTest.sh like this:
>
> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>
> I deploy the shell at ten servers!
>
> Thanks!
> Best Regards!
>
> Jian Fan
>
> 2012/7/13 Jun Rao <junrao@gmail.com>
>
> > That seems like a Kafka bug. Do you have a script that can reproduce
> this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhadoop@gmail.com>
> > wrote:
> >
> > > HI:
> > > I use kafka0.7.1, here is the stack trace in kafka server:
> > >
> > >  ERROR Error processing MultiProducerRequest on bxx:2
> > > (kafka.server.KafkaRequestHandlers)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > > [2012-07-13 08:40:06,182] ERROR Closing socket for
> /192.168.75.13because
> > > of error (kafka.network.Processor)
> > > kafka.message.InvalidMessageException: message is invalid, compression
> > > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> > > at
> > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> > > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> > > at kafka.log.Log.append(Log.scala:205)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> > > at
> > >
> > >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> > > at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at
> > >
> > >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
> > > at kafka.network.Processor.handle(SocketServer.scala:296)
> > > at kafka.network.Processor.read(SocketServer.scala:319)
> > > at kafka.network.Processor.run(SocketServer.scala:214)
> > > at java.lang.Thread.run(Thread.java:722)
> > >
> > > here is the track stace in kafka producer:
> > > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
> in
> > > 60000 ms (kafka.producer.SyncProducer)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect(Native Method)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
> > > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
> > > at
> > >
> >
> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
> > > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
> > > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
> > >
> > > The kafka producer is multi-thread program.
> > >
> > > Thanks!
> > >
> > > Best Regards!
> > >
> > >
> > > 2012/7/13 Neha Narkhede <neha.narkhede@gmail.com>
> > >
> > > > In addition to Jun's question,
> > > >
> > > > which version are you using ? Do you have a reproducible test case ?
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <junrao@gmail.com> wrote:
> > > > > What's the stack trace?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
> xiaofanhadoop@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> HI:
> > > > >>
> > > > >> Guys, I test kafka in our test high cocunnrent enivorment, I
> always
> > > get
> > > > the
> > > > >> error message as follows:
> > > > >>
> > > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
> > > > >> (kafka.server.KafkaRequestHandlers)
> > > > >> kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
> offset:
> > 0
> > > > >>
> > > > >> Can anyone help? Thanks!
> > > > >>
> > > > >> Best Regards
> > > > >>
> > > > >> Jian Fan
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message