kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: SimpleConsumer cannot read KeyedMessage<byte[], byte[]>.
Date Sat, 02 Nov 2013 03:45:27 GMT
Did you make sure the fetch size in the fetch request is larger than the
size of a single message?

Thanks,

Jun


On Fri, Nov 1, 2013 at 5:07 PM, Lu Xuechao <luxuec@gmail.com> wrote:

> The consumer starts from offset 0. Yes, in the log dir.
>
>
> On Fri, Nov 1, 2013 at 4:06 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > Which offset did you use for fetching? Is there data in the kafka log
> dir?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Nov 1, 2013 at 11:48 AM, Lu Xuechao <luxuec@gmail.com> wrote:
> >
> > > checked fetchResponse.hasError() but has no error.
> > >
> > >
> > > On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > Did you check the error code associated with each partition in the
> > fetch
> > > > response?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <luxuec@gmail.com>
> wrote:
> > > >
> > > > > No. The simple consumer does receive some responses and can iterate
> > the
> > > > > loop:
> > > > >
> > > > > for (MessageAndOffset messageAndOffset :
> > > > fetchResponse.messageSet(m_topic,
> > > > > m_partition)) {
> > > > >  //handle messages
> > > > > }
> > > > >
> > > > > but after that, the response still returns will byte[], I can see
> the
> > > > > content, but the iterator cannot iterate:
> > > > >
> > > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic,
> > > > > m_partition).iterator();
> > > > > itr.hasNext()  is FALSE.
> > > > >
> > > > > No error messages found.
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <junrao@gmail.com>
wrote:
> > > > >
> > > > > > Is that related to
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F
> > > > > > ?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <luxuec@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > It seems the reason is I enabled gzip compression.
> > > > > > >
> > > > > > > what the code would like to consume compressed messages?
> > > > > > >
> > > > > > > thanks.
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <luxuec@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I am following the
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > > > > > >
> > > > > > > > When I send KeyedMessage<String, String> with
StringEncoder,
> I
> > > can
> > > > > get
> > > > > > > the
> > > > > > > > messages sent:
> > > > > > > >
> > > > > > > > for (MessageAndOffset messageAndOffset :
> > > > > > > fetchResponse.messageSet(m_topic,
> > > > > > > > m_partition)) {
> > > > > > > >  //handle messages
> > > > > > > > }
> > > > > > > >
> > > > > > > > But when I send KeyedMessage<byte[], byte[]>
with
> > > DefaultEncoder, I
> > > > > > > cannot
> > > > > > > > get the messages:
> > > > > > > >
> > > > > > > > Iterator<MessageAndOffset> itr =
> > > fetchResponse.messageSet(m_topic,
> > > > > > > > m_partition).iterator();
> > > > > > > > itr.hasNext()  is FALSE.
> > > > > > > >
> > > > > > > > the test code is the same, what is causing this? What
change
> > > needs
> > > > to
> > > > > > be
> > > > > > > > made?
> > > > > > > >
> > > > > > > > thanks.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message