kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elias Levy <fearsome.lucid...@gmail.com>
Subject Re: Heartbeating during long processing times
Date Wed, 06 Jul 2016 16:30:53 GMT

Thanks for pointing me to KIP-62.  Once implemented, it will make workers
that take a long time processing messages a lot simpler to implement.
Until then, we have to continue using the pause/poll/resume pattern.  That
said, as fares I can tell, this pattern has not been well documented.

It appears the issue I observed is the result of consumer rebalancing. When
a consumer with paused partitions calls poll to trigger a heartbeat, the
client will process any pending consumer rebalances.  The rebalance will
potentially result in the addition of newly assigned unpaused partitions.
Worse is the fact that already assigned partitions that were paused and
that continue to be assigned to the client after the rebalance will be
become unpaused. I consider this a bug in the client.  Paused partitions
should not be unpaused during a rebalance if they continue to be assigned
to the client.  So pause/poll/resume is not sufficient for a worker that
handles messages with long processing times.  One must also implement a
ConsumerRebalanceListener that pauses all assigned partitions if the
consumer is in the middle of processing a message.

On Fri, Jul 1, 2016 at 11:52 AM, Shikhar Bhushan <shikhar@confluent.io>

> Hi Elias,
> KIP-62
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> >
> has a discussion of current options, and the improvements that are coming.
> Best,
> Shikhar
> On Thu, Jun 30, 2016 at 6:02 PM Elias Levy <fearsome.lucidity@gmail.com>
> wrote:
> > What is the officially recommended method to heartbeat using the new Java
> > consumer during long message processing times?
> >
> > I thought I could accomplish this by setting max.poll.records to 1 in the
> > client, calling consumer.pause(consumer.assignment()) when starting to
> > process a record, calling consumer.resume(consumer.paused()) when done
> > processing a record and committing its offset, and calling
> consumer.poll(0)
> > intermittently while processing the record.
> >
> > The testing shows that consumer.poll(0) will return records, rather than
> > returning nil or an empty ConsumerRecords.
> >

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message