kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: Help with encoding issue.
Date Wed, 04 Apr 2012 21:44:36 GMT
Yeah I think I jumped to conclusions. The issue I was referring to was
just assuming the detault encoding, which would not cause the issue
you described.

-Jay

2012/4/3 Patricio Echagüe <patricioe@gmail.com>:
> Interesting. What I can't explain though is why it works just fine when
> printing the string this way:
>
>      for(Message message: stream) {
>
>        ByteBuffer bb = message.payload().duplicate();
>
>        ByteBuffer bb2 = message.payload().duplicate();
>
>        byte[] bytes = new byte[bb2.remaining()];
>
>        bb2.get(bytes);
>
>        System.out.println("Message received string: " + new String(bytes));
>
>          consumerConnector.commitOffsets();
>
>    }
> do you have a link to your patch Jay ?
> On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
>> This is our bug, we were taking the system default encoding (d'oh). I
>> have a patch for it I was adding to 0.8, we can probably backport it
>> for older releases too pretty easily.
>>
>> -Jay
>>
>> 2012/4/3 Patricio Echagüe <patricioe@gmail.com>:
>> > Hi, I noticed that String Serializer somehow doesn't do well encoding
>> > special characters such as "ü".
>> >
>> > I tried to create a ByteBufferEncoder this way:
>> >
>> > import java.nio.ByteBuffer;
>> >
>> > import kafka.message.Message;
>> >
>> > import kafka.serializer.Encoder;
>> >
>> >
>> > public class ByteBufferEncoder implements Encoder<ByteBuffer> {
>> >
>> >   public Message toMessage(ByteBuffer buffer) {
>> >
>> >     return new Message(buffer);
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> > but I get this exception [1]
>> >
>> >
>> > Could you guys please advice on how to fix my encoding issue?
>> >
>> > Thanks
>> >
>> >
>> > [1]
>> >
>> > Exception in thread "main" java.lang.RuntimeException: Invalid magic
>> byte 34
>> >
>> > at kafka.message.Message.compressionCodec(Message.scala:144)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(
>> > ByteBufferMessageSet.scala:112)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> > ByteBufferMessageSet.scala:138)
>> >
>> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(
>> > ByteBufferMessageSet.scala:82)
>> >
>> > at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>> >
>> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>> >
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>> >
>> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>> >
>> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>> >
>> > at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>> >
>> > at
>> >
>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize(
>> > SyncProducer.scala:139)
>> >
>> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>> >
>> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(
>> > ProducerPool.scala:116)
>> >
>> > at
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> >
>> > at
>> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>> >
>> > at scala.collection.mutable.ResizableArray$class.foreach(
>> > ResizableArray.scala:57)
>> >
>> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>> >
>> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>> >
>> > at kafka.producer.Producer.zkSend(Producer.scala:143)
>> >
>> > at kafka.producer.Producer.send(Producer.scala:105)
>> >
>> > at kafka.javaapi.producer.Producer.send(Producer.scala:104)
>> >
>> > at
>> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63)
>>

Mime
View raw message