kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: advice on using compressionCodec
Date Tue, 11 Sep 2012 16:12:04 GMT
Raymond,

The consumer iterator will decompress the messages and then return to
the client. So you don't need to explicitly decompress data.

Thanks,
Neha

On Tue, Sep 11, 2012 at 8:30 AM, Raymond Ng <raymondair@gmail.com> wrote:
> Hi all
>
> I'm using kafka 0.7.0 and trying to process a message thats been GZIP
> compressed
>
> my test program which writes to a kafka queue has compression.codec = "1",
> and i can see the message has been written to the kafka server in a
> compressed format
>
> but I'm having trouble decompressing the message on the other end, my code
> as follows
>
> private void getMessage() {
>    SimpleConsumer consumer =
> kafkaPartitionsConns.getConsumer(currentPartition);
>    int hostPartition =
> kafkaPartitionsConns.getHostPartition(currentPartition);
>
>    ByteBufferMessageSet msgs = consumer.fetch(
>      new FetchRequest(
>        kafkaConf.topic,
>        hostPartition,
>        msgOffset,
>        kafkaConf.fetchSizeBytes));
>    java.util.Iterator <MessageAndOffset> compressedMsgSet = msgs.iterator();
>
>    while(compressedMsgSet.hasNext()) {
>     MessageAndOffset mao = compressedMsgSet.next();
>     Message msg = mao.message();
>     logger.info("msg : "+msg.toString());
>     Iterator decompressedMsgSet =
> CompressionUtils.decompress(mao.message()).iterator();
>     while (decompressedMsgSet.hasNext()) {
>      msgAwaiting.add(decompressedMsgSet.next());
>     }
>    }
> }
>
> 2012-09-11 16:16:24,581  INFO [Thread-3] KafkaTest.java - msg :
> message(magic = 1, attributes = 0, crc = 3908262406, payload =
> java.nio.HeapByteBuffer[pos=0 lim=7 cap=7])
> 2012-09-11 16:16:24,582 ERROR [Thread-3] PollableSourceRunner.java -
> Unhandled exception, logging and sleeping for 5000ms
> kafka.common.UnknownCodecException: Unknown Codec: NoCompressionCodec
>  at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:142)
>  at kafka.message.CompressionUtils.decompress(CompressionUtils.scala)
>
> I need to parse the message back to String for further processing, but
> can't work out why the original message didn't carry any compressionCodec
> any advice will be appreciated
>
> --
> Rgds
> Ray

Mime
View raw message