kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Rudert <rud...@gmail.com>
Subject Re: commitOffsets() in multithreaded consumer process
Date Sat, 10 Aug 2013 07:51:44 GMT
Thank you!

So I guess, you suggest a really really small timeout so that the other
consuming threads don't get regularly blocked for the timeout period? My
consumer use case does not allow having "longer" breaks because there are
some high traffic topics.

Thanks
Jan


2013/8/10 Jun Rao <junrao@gmail.com>

> The consumer has a config property called consumer.timeout.ms. By setting
> the value to a positive integer, a timeout exception is thrown to the
> consumer if no message is available for consumption after the specified
> timeout value.
>
> Thanks,
> Jun
>
>
> On Fri, Aug 9, 2013 at 9:25 AM, Jan Rudert <rudert@gmail.com> wrote:
>
> > Hi,
> >
> > I have an consumer application where I have a message stream per topic
> and
> > one thread per stream.
> >
> > I will do a commitOffsets() when a global shared message counter is
> > reaching a limit.
> >
> > I think I need to make sure that no thread is consuming while I call
> > commitOffsets() to ensure that no concurrent consuming error happens in
> one
> > of the threads.
> >
> > Therefor I use a CyclicBarrier in my threads and do the commitOffsets()
> in
> > the barrier action.
> >
> > The problem arises in case thread A is blocked in stream.next() when
> there
> > is no traffic in the topic. When the other threads are blocked in
> > barrier.await() they have to wait until A receives a message. This can
> > possible block all consuming.
> >
> > Is there a best practice on committing properly in a multithreaded
> > consumer?
> >
> > Thank you!
> > Jan
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message