kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjian fan <xiaofanhad...@gmail.com>
Subject Re: error with kafka
Date Fri, 13 Jul 2012 02:10:43 GMT
OK,sometime it has this error :

[2012-07-13 10:08:03,205] ERROR Closing socket for /192.168.75.15 because
of error (kafka.network.Processor)
kafka.common.InvalidTopicException: topic name can't be empty
at kafka.log.LogManager.getLogPool(LogManager.scala:159)
at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:722)

2012/7/13 jjian fan <xiaofanhadoop@gmail.com>

> sorry!
>
> The  producer.type  shoud be async!!
>
> I have change it to sync in my new test!
>
> Best Regards!
> Jian Fan
>
>
> 2012/7/13 jjian fan <xiaofanhadoop@gmail.com>
>
>> I post my code here:
>>
>> ProducerThread.java
>> package com.tz.kafka;
>>
>>
>> import java.io.Serializable;
>> import java.util.Properties;
>> import kafka.producer.ProducerConfig;
>> import kafka.javaapi.producer.*;
>> import java.util.*;
>> import java.util.concurrent.CopyOnWriteArrayList;
>>
>> public class ProducerThread implements Runnable ,Serializable
>>  {
>>   /**
>>  *
>>  */
>>  private static final long serialVersionUID = 18977854555656L;
>> //private final kafka.javaapi.producer.Producer<Integer, String> producer;
>>   private String topic;
>>   private Properties props = new Properties();
>>       private String messageStr;
>>   public  ProducerThread(String kafkatopic,String message)
>>   {
>>     synchronized(this){
>>     props.put("zk.connect", "192.168.75.45:2181,192.168.75.55:2181,
>> 192.168.75.65:2181");
>>  //props.put("broker.list", "4:192.168.75.104:9092");
>> //props.put("serializer.class", "kafka.serializer.StringEncoder");
>>  props.put("serializer.class", "kafka.serializer.StringEncoder");
>> props.put("producer.type", "sync");
>>  props.put("compression.codec", "1");
>> props.put("batch.size", "5");
>>  props.put("queue.enqueueTimeout.ms", "-1");
>> props.put("queue.size", "2000");
>>  props.put("buffer.size", "10240000");
>> //props.put("event.handler", "kafka.producer.async.EventHandler<T>");
>>  props.put("zk.sessiontimeout.ms", "6000000");
>> props.put("zk.connectiontimeout.ms", "6000000");
>>  props.put("socket.timeout.ms", "60000000");
>> props.put("connect.timeout.ms", "60000000");
>>  props.put("max.message.size", "20000");
>> props.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
>>  props.put("reconnect.interval.ms", "3000");
>>     // Use random partitioner. Don't need the key type. Just set it to
>> Integer.
>>     // The message is of type String.
>> //producer = new kafka.javaapi.producer.Producer<Integer, String>(new
>> ProducerConfig(props));
>>     //producer = new kafka.javaapi.producer.Producer<String, String>(new
>> ProducerConfig(props));
>>     this.topic = kafkatopic;
>>     this.messageStr = message;
>>
>>   }
>>   }
>>
>>   public void run() {
>> synchronized(this) {
>>  Producer<String, String> producer  = new Producer<String, String>(new
>> ProducerConfig(props));
>>     //producer.
>>  long messageNo = 0;
>>     long t = System.currentTimeMillis();
>>     long r = System.currentTimeMillis();
>>     long time = r-t;
>>     long rate = 0;
>>     List<String> messageSet = new CopyOnWriteArrayList<String>();
>>     while(true)
>>     {
>>       if(topic.length() > 0 )
>>       {
>>      messageSet.add(this.messageStr.toString());
>>          ProducerData<String, String> data = new ProducerData<String,
>> String>(topic,null,messageSet);
>>
>>          producer.send(data);
>>          messageSet.clear();
>>          data = null;
>>          messageNo++;
>>
>>       }
>>
>>       if(messageNo % 200000 ==0)
>>       {
>>       r = System.currentTimeMillis();
>>       time = r-t;
>>       rate = 200000000/time;
>>       System.out.println(this.topic + " send message per second:"+rate);
>>       t = r;
>>       }
>>
>>      }
>> }
>>   }
>>     }
>>
>> ProducerThreadTest1.java
>>
>> package com.tz.kafka;
>>
>> import java.util.concurrent.ThreadPoolExecutor;
>> import java.util.concurrent.TimeUnit;
>> import java.util.concurrent.LinkedBlockingQueue;
>>
>> public class ProducerThreadTest1 {
>>
>> /**
>>  * @param args
>>  * @throws InterruptedException
>>  */
>>  public static void main(String[] args) throws InterruptedException {
>> // TODO Auto-generated method stub
>>  int i = Integer.parseInt(args[0]);
>>  ThreadPoolExecutor threadPool = new ThreadPoolExecutor(i, i, 5,
>>  TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(i),
>> new ThreadPoolExecutor.DiscardOldestPolicy());
>>  int messageSize = Integer.parseInt(args[1]);
>>  StringBuffer messageStr = new StringBuffer();
>>  for(int messagesize=0;messagesize<messageSize;messagesize++)
>>      {
>>      messageStr.append("X");
>>      }
>>  String topic = args[2];
>> for(int j=0;j < i; j++)
>> {
>>    topic += "x";
>>    threadPool.execute(new ProducerThread(topic,messageStr.toString()));
>>    Thread.sleep(1000);
>>
>> }
>>  }
>>
>> }
>>
>>
>> the shell scripte kafkaThreadTest.sh like this:
>>
>> java -Xmx10G -jar kafkaThreadTest.jar 2 1024 a
>>
>> I deploy the shell at ten servers!
>>
>> Thanks!
>> Best Regards!
>>
>> Jian Fan
>>
>> 2012/7/13 Jun Rao <junrao@gmail.com>
>>
>>> That seems like a Kafka bug. Do you have a script that can reproduce
>>> this?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Thu, Jul 12, 2012 at 5:44 PM, jjian fan <xiaofanhadoop@gmail.com>
>>> wrote:
>>>
>>> > HI:
>>> > I use kafka0.7.1, here is the stack trace in kafka server:
>>> >
>>> >  ERROR Error processing MultiProducerRequest on bxx:2
>>> > (kafka.server.KafkaRequestHandlers)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> > at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> > at kafka.log.Log.append(Log.scala:205)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>> > at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>> > at java.lang.Thread.run(Thread.java:722)
>>> > [2012-07-13 08:40:06,182] ERROR Closing socket for /192.168.75.13because
>>> > of error (kafka.network.Processor)
>>> > kafka.message.InvalidMessageException: message is invalid, compression
>>> > codec: NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>>> > at
>>> >
>>> >
>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>> > at
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>>> > at kafka.log.Log.append(Log.scala:205)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>>> > at
>>> >
>>> >
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>>> > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>>> > at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>>> > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at
>>> >
>>> >
>>> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>>> > at kafka.network.Processor.handle(SocketServer.scala:296)
>>> > at kafka.network.Processor.read(SocketServer.scala:319)
>>> > at kafka.network.Processor.run(SocketServer.scala:214)
>>> > at java.lang.Thread.run(Thread.java:722)
>>> >
>>> > here is the track stace in kafka producer:
>>> > ERROR Connection attempt to 192.168.75.104:9092 failed, next attempt
>>> in
>>> > 60000 ms (kafka.producer.SyncProducer)
>>> > java.net.ConnectException: Connection refused
>>> > at sun.nio.ch.Net.connect(Native Method)
>>> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:525)
>>> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:173)
>>> > at
>>> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:196)
>>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:92)
>>> > at kafka.producer.SyncProducer.multiSend(SyncProducer.scala:135)
>>> > at
>>> >
>>> kafka.producer.async.DefaultEventHandler.send(DefaultEventHandler.scala:58)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:44)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:116)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:95)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:71)
>>> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>>> > at
>>> >
>>> >
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:70)
>>> > at
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:41)
>>> >
>>> > The kafka producer is multi-thread program.
>>> >
>>> > Thanks!
>>> >
>>> > Best Regards!
>>> >
>>> >
>>> > 2012/7/13 Neha Narkhede <neha.narkhede@gmail.com>
>>> >
>>> > > In addition to Jun's question,
>>> > >
>>> > > which version are you using ? Do you have a reproducible test case
?
>>> > >
>>> > > Thanks,
>>> > > Neha
>>> > >
>>> > > On Thu, Jul 12, 2012 at 7:19 AM, Jun Rao <junrao@gmail.com> wrote:
>>> > > > What's the stack trace?
>>> > > >
>>> > > > Thanks,
>>> > > >
>>> > > > Jun
>>> > > >
>>> > > > On Thu, Jul 12, 2012 at 12:55 AM, jjian fan <
>>> xiaofanhadoop@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > >> HI:
>>> > > >>
>>> > > >> Guys, I test kafka in our test high cocunnrent enivorment,
I
>>> always
>>> > get
>>> > > the
>>> > > >> error message as follows:
>>> > > >>
>>> > > >> ERROR Error processing MultiProducerRequest on axxxxxxxx:2
>>> > > >> (kafka.server.KafkaRequestHandlers)
>>> > > >> kafka.message.InvalidMessageException: message is invalid,
>>> compression
>>> > > >> codec: NoCompressionCodec size: 1034 curr offset: 3114 init
>>> offset: 0
>>> > > >>
>>> > > >> Can anyone help? Thanks!
>>> > > >>
>>> > > >> Best Regards
>>> > > >>
>>> > > >> Jian Fan
>>> > > >>
>>> > >
>>> >
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message