kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echagüe <patric...@gmail.com>
Subject Re: How to dequeue the last message again during failures
Date Thu, 05 Jan 2012 17:15:06 GMT
Thanks Jun for your answer.

Do I need to turn autocommit off on the consumer? Since I'll be doing it
manually.

Sent from my Android
On Jan 5, 2012 8:50 AM, "Jun Rao" <junrao@gmail.com> wrote:

> Do you want to stop consumption when you hit a deserialization error and
> fix the bug in the deserializer and reconsume the last message? If so, you
> can explicitly call commitOffset periodically after successful
> deserialization of the messages. If you fail and restart after the bug is
> fixed, the last few messages will be replayed. Under the cover,
> commitOffset commit offset of the last consumed message for each subscribed
> topic.
>
> Jun
>
>
> 2012/1/4 Patricio Echagüe <patricioe@gmail.com>
>
> > Hi folks, I was hoping to get some advice on how to design the following
> > use case.
> >
> > My code (consumer) reads messages from Topic A and per partition (that at
> > the moment is just 1). The consumer is single threaded per topic.
> > After reading/dequeuing the message, I get an error when trying to
> > deserialize it (this error is related to the way I serialize my objects
> > using json) making my consumer unable to re-process the message (since
> the
> > message was already consumed). It is not a Kafka-related issue but made
> me
> > realize the fact that I can lose messages.
> >
> > Ideally I would live to avoid "commiting" to the broker that the message
> > has been consumed, wait until the message is processed successfully by my
> > consumer and once I make sure I properly processed the message, then send
> > the acknowledge to the broker indicating that this message can be
> > discarded.
> >
> > In case of an error, the broker should be able to re-send the same
> message.
> >
> > What would be the way to achieve this?
> >
> > I see that MessageStream has a method called "commitOffset" but It
> doesn't
> > seem to apply to a particular topic.
> >
> > Am I approaching the problem in the wrong direction ?
> >
> > Thanks
> > Patricio
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message