kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: commitOffsets() in multithreaded consumer process
Date Sat, 10 Aug 2013 04:16:26 GMT
The consumer has a config property called consumer.timeout.ms. By setting
the value to a positive integer, a timeout exception is thrown to the
consumer if no message is available for consumption after the specified
timeout value.

Thanks,
Jun


On Fri, Aug 9, 2013 at 9:25 AM, Jan Rudert <rudert@gmail.com> wrote:

> Hi,
>
> I have an consumer application where I have a message stream per topic and
> one thread per stream.
>
> I will do a commitOffsets() when a global shared message counter is
> reaching a limit.
>
> I think I need to make sure that no thread is consuming while I call
> commitOffsets() to ensure that no concurrent consuming error happens in one
> of the threads.
>
> Therefor I use a CyclicBarrier in my threads and do the commitOffsets() in
> the barrier action.
>
> The problem arises in case thread A is blocked in stream.next() when there
> is no traffic in the topic. When the other threads are blocked in
> barrier.await() they have to wait until A receives a message. This can
> possible block all consuming.
>
> Is there a best practice on committing properly in a multithreaded
> consumer?
>
> Thank you!
> Jan
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message