kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Rudert <rud...@gmail.com>
Subject commitOffsets() in multithreaded consumer process
Date Fri, 09 Aug 2013 16:25:25 GMT
Hi,

I have an consumer application where I have a message stream per topic and
one thread per stream.

I will do a commitOffsets() when a global shared message counter is
reaching a limit.

I think I need to make sure that no thread is consuming while I call
commitOffsets() to ensure that no concurrent consuming error happens in one
of the threads.

Therefor I use a CyclicBarrier in my threads and do the commitOffsets() in
the barrier action.

The problem arises in case thread A is blocked in stream.next() when there
is no traffic in the topic. When the other threads are blocked in
barrier.await() they have to wait until A receives a message. This can
possible block all consuming.

Is there a best practice on committing properly in a multithreaded consumer?

Thank you!
Jan

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message