kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anjani Gupta <anjani.gu...@salesforce.com>
Subject Re: kafka_2.10-0.8.1 simple consumer retrieves junk data in the message
Date Tue, 31 Jan 2017 18:00:37 GMT
We use following method to deserialize the message consumed using Simple
Consumer -

DatumReader<T> datumReader = new SpecificDatumReader<>(className);
ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
T object = datumReader.read(null, decoder);
IOUtils.closeQuietly(inputStream);


It does not seem to handle header bytes. When I remove those 26 bytes,

deserialization work fine. Please note, we are using Simple consumer
API, not high level consumer.


On Mon, Jan 30, 2017 at 10:57 PM, Ewen Cheslack-Postava <ewen@confluent.io>
wrote:

> What are the 26 additional bytes? That sounds like a header that a
> decoder/deserializer is handling with the high level consumer. What class
> are you using to deserialize the messages with the high level consumer?
>
> -Ewen
>
> On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta <
> anjani.gupta@salesforce.com>
> wrote:
>
> > I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
> > Consumer API. I notice that byte array for message retrieved has 26 junk
> > bytes appended at the beginning  of original message sent by producer.
> Any
> > idea what's going on here? This works fine with High level consumer.
> >
> > This is how my code looks like -
> >
> > TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> > partition);
> >         OffsetFetchResponse offsetFetchResponse =
> consumer.fetchOffsets(new
> > OffsetFetchRequest(GROUP_ID,
> >                 Collections.singletonList(topicAndPartition), (short) 0,
> > 0,
> > CLIENT_ID));
> >
> >         //Fetch messages from Kafka.
> >         FetchRequest req = new FetchRequestBuilder()
> >                 .clientId(CLIENT_ID)
> >                 .addFetch(topic, partition, readOffset, 1000)
> >                 .build();
> >         FetchResponse fetchResponse = consumer.fetch(req);
> >         for (MessageAndOffset messageAndOffset :
> > fetchResponse.messageSet(topicName, partition)) {
> >             byte[] message = messageAndOffset.message().
> payload().array();
> >
> > }
> >
> > Here message has additional 26 bytes appended to beginning of array.
> >
> >
> > Thanks,
> > Anjani
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message