kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echagüe <patric...@gmail.com>
Subject Re: Help with encoding issue.
Date Tue, 03 Apr 2012 23:40:56 GMT
Interesting. What I can't explain though is why it works just fine when
printing the string this way:

      for(Message message: stream) {

        ByteBuffer bb = message.payload().duplicate();

        ByteBuffer bb2 = message.payload().duplicate();

        byte[] bytes = new byte[bb2.remaining()];

        bb2.get(bytes);

        System.out.println("Message received string: " + new String(bytes));

          consumerConnector.commitOffsets();

    }
do you have a link to your patch Jay ?
On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <jay.kreps@gmail.com> wrote:

> This is our bug, we were taking the system default encoding (d'oh). I
> have a patch for it I was adding to 0.8, we can probably backport it
> for older releases too pretty easily.
>
> -Jay
>
> 2012/4/3 Patricio Echagüe <patricioe@gmail.com>:
> > Hi, I noticed that String Serializer somehow doesn't do well encoding
> > special characters such as "ü".
> >
> > I tried to create a ByteBufferEncoder this way:
> >
> > import java.nio.ByteBuffer;
> >
> > import kafka.message.Message;
> >
> > import kafka.serializer.Encoder;
> >
> >
> > public class ByteBufferEncoder implements Encoder<ByteBuffer> {
> >
> >   public Message toMessage(ByteBuffer buffer) {
> >
> >     return new Message(buffer);
> >
> >   }
> >
> > }
> >
> >
> > but I get this exception [1]
> >
> >
> > Could you guys please advice on how to fix my encoding issue?
> >
> > Thanks
> >
> >
> > [1]
> >
> > Exception in thread "main" java.lang.RuntimeException: Invalid magic
> byte 34
> >
> > at kafka.message.Message.compressionCodec(Message.scala:144)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
> > ByteBufferMessageSet.scala:112)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> > ByteBufferMessageSet.scala:138)
> >
> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
> > ByteBufferMessageSet.scala:82)
> >
> > at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >
> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >
> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >
> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >
> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >
> > at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
> > SyncProducer.scala:139)
> >
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> >
> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
> > ProducerPool.scala:116)
> >
> > at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> >
> > at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> >
> > at scala.collection.mutable.ResizableArray$class.foreach(
> > ResizableArray.scala:57)
> >
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> >
> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> >
> > at kafka.producer.Producer.zkSend(Producer.scala:143)
> >
> > at kafka.producer.Producer.send(Producer.scala:105)
> >
> > at kafka.javaapi.producer.Producer.send(Producer.scala:104)
> >
> > at
> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message