kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echagüe <patric...@gmail.com>
Subject Re: How to dequeue the last message again during failures
Date Thu, 05 Jan 2012 17:43:41 GMT
Thanks Jun. So how to I commit per topic ? I'm still missing that part.
Since I have one thread per topic I have no control of the state of the
other topics and I want each thread to commit as soon as they successfully
consume and process each message.

Sent from my Android
On Jan 5, 2012 9:23 AM, "Jun Rao" <junrao@gmail.com> wrote:

> Yes, if want the full control.
>
> Thanks,
>
> Jun
>
> 2012/1/5 Patricio Echagüe <patricioe@gmail.com>
>
> > Thanks Jun for your answer.
> >
> > Do I need to turn autocommit off on the consumer? Since I'll be doing it
> > manually.
> >
> > Sent from my Android
> > On Jan 5, 2012 8:50 AM, "Jun Rao" <junrao@gmail.com> wrote:
> >
> > > Do you want to stop consumption when you hit a deserialization error
> and
> > > fix the bug in the deserializer and reconsume the last message? If so,
> > you
> > > can explicitly call commitOffset periodically after successful
> > > deserialization of the messages. If you fail and restart after the bug
> is
> > > fixed, the last few messages will be replayed. Under the cover,
> > > commitOffset commit offset of the last consumed message for each
> > subscribed
> > > topic.
> > >
> > > Jun
> > >
> > >
> > > 2012/1/4 Patricio Echagüe <patricioe@gmail.com>
> > >
> > > > Hi folks, I was hoping to get some advice on how to design the
> > following
> > > > use case.
> > > >
> > > > My code (consumer) reads messages from Topic A and per partition
> (that
> > at
> > > > the moment is just 1). The consumer is single threaded per topic.
> > > > After reading/dequeuing the message, I get an error when trying to
> > > > deserialize it (this error is related to the way I serialize my
> objects
> > > > using json) making my consumer unable to re-process the message
> (since
> > > the
> > > > message was already consumed). It is not a Kafka-related issue but
> made
> > > me
> > > > realize the fact that I can lose messages.
> > > >
> > > > Ideally I would live to avoid "commiting" to the broker that the
> > message
> > > > has been consumed, wait until the message is processed successfully
> by
> > my
> > > > consumer and once I make sure I properly processed the message, then
> > send
> > > > the acknowledge to the broker indicating that this message can be
> > > > discarded.
> > > >
> > > > In case of an error, the broker should be able to re-send the same
> > > message.
> > > >
> > > > What would be the way to achieve this?
> > > >
> > > > I see that MessageStream has a method called "commitOffset" but It
> > > doesn't
> > > > seem to apply to a particular topic.
> > > >
> > > > Am I approaching the problem in the wrong direction ?
> > > >
> > > > Thanks
> > > > Patricio
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message