kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: How to dequeue the last message again during failures
Date Thu, 05 Jan 2012 17:56:15 GMT
When you call commitOffset, we commit the offset for all subscribed topics.
If you want topic level control, currently, you have to use multiple
consumer connectors, one for each topic separately.

Jun

2012/1/5 Patricio Echagüe <patricioe@gmail.com>

> Thanks Jun. So how to I commit per topic ? I'm still missing that part.
> Since I have one thread per topic I have no control of the state of the
> other topics and I want each thread to commit as soon as they successfully
> consume and process each message.
>
> Sent from my Android
> On Jan 5, 2012 9:23 AM, "Jun Rao" <junrao@gmail.com> wrote:
>
> > Yes, if want the full control.
> >
> > Thanks,
> >
> > Jun
> >
> > 2012/1/5 Patricio Echagüe <patricioe@gmail.com>
> >
> > > Thanks Jun for your answer.
> > >
> > > Do I need to turn autocommit off on the consumer? Since I'll be doing
> it
> > > manually.
> > >
> > > Sent from my Android
> > > On Jan 5, 2012 8:50 AM, "Jun Rao" <junrao@gmail.com> wrote:
> > >
> > > > Do you want to stop consumption when you hit a deserialization error
> > and
> > > > fix the bug in the deserializer and reconsume the last message? If
> so,
> > > you
> > > > can explicitly call commitOffset periodically after successful
> > > > deserialization of the messages. If you fail and restart after the
> bug
> > is
> > > > fixed, the last few messages will be replayed. Under the cover,
> > > > commitOffset commit offset of the last consumed message for each
> > > subscribed
> > > > topic.
> > > >
> > > > Jun
> > > >
> > > >
> > > > 2012/1/4 Patricio Echagüe <patricioe@gmail.com>
> > > >
> > > > > Hi folks, I was hoping to get some advice on how to design the
> > > following
> > > > > use case.
> > > > >
> > > > > My code (consumer) reads messages from Topic A and per partition
> > (that
> > > at
> > > > > the moment is just 1). The consumer is single threaded per topic.
> > > > > After reading/dequeuing the message, I get an error when trying to
> > > > > deserialize it (this error is related to the way I serialize my
> > objects
> > > > > using json) making my consumer unable to re-process the message
> > (since
> > > > the
> > > > > message was already consumed). It is not a Kafka-related issue but
> > made
> > > > me
> > > > > realize the fact that I can lose messages.
> > > > >
> > > > > Ideally I would live to avoid "commiting" to the broker that the
> > > message
> > > > > has been consumed, wait until the message is processed successfully
> > by
> > > my
> > > > > consumer and once I make sure I properly processed the message,
> then
> > > send
> > > > > the acknowledge to the broker indicating that this message can be
> > > > > discarded.
> > > > >
> > > > > In case of an error, the broker should be able to re-send the same
> > > > message.
> > > > >
> > > > > What would be the way to achieve this?
> > > > >
> > > > > I see that MessageStream has a method called "commitOffset" but It
> > > > doesn't
> > > > > seem to apply to a particular topic.
> > > > >
> > > > > Am I approaching the problem in the wrong direction ?
> > > > >
> > > > > Thanks
> > > > > Patricio
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message