kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinay sharma <vinsharma.t...@gmail.com>
Subject RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
Date Tue, 26 Apr 2016 12:23:59 GMT
Hi Phil,

CommitSync sends a heartbeat request on each call but it seems that somehow
it stops sending a heartbeat request after a meta refresh till next poll. I
asked about this on dev list and came to know that this is fixed in
0.10.0.0 which is next version. I heve not gone in to detail of defect but
it seems something is fixed related to time reset of hearbeat task so that
next heatbeat request time is calculated correctly. From next version
commitSync will act as heartbeat as per the defect.

Regards,
Vinay Sharma
On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <phil.luckhurst@encycle.com>
wrote:

> Hi Vinay,
>
> "Regarding identifying a rebalance, how about comparing array used for
> consumer pause with current assignments of consumer?"
>
> I did consider checking that in the commitSync exception handler but
> didn't try it because if this is in the consumer that has caused the
> rebalance (i.e. the one that appears to be dead) I didn't think its
> partition assignments would have been updated when handling the exception,
> the ConsumerRebalanceListener callbacks have not yet been called - I can
> give it a try though. That's why I thought having commitSync throw an
> explicit 'rebalance in progress' type exception rather than just a
> KafkaException would allow this to be easily identified and handled.
>
> The information about the metadata request is useful, I'll watch out for
> that if we change our commit logic.
>
> Thanks
> Phil Luckhurst
>
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 25 April 2016 20:30
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> Regarding identifying a rebalance, how about comparing array used for
> consumer pause with current assignments of consumer?
>
> Regarding refresh after meta data refresh request, that will not happen if
> you are committing after each record. I have Session time of 30000 ms and
> if i commit last processed records before session time out then everything
> is fine except after a meta data refresh request i see a rebalance which
> causes "Error UNKNOWN_MEMBER_ID occurred while committing offsets"  on
> further commits from consumer till next poll. This error means that even on
> committing on regular intervals (which sends heartbeat) this somehow does
> not saves consumer from getting timeout during a meta refresh. This issue
> does not happen if i am committing after each record that is between 2-4
> seconds or if a commit happens tight after meta refresh response.
>
> Regards,
> Vinay Sharma
>
>
> On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <
> phil.luckhurst@encycle.com
> > wrote:
>
> > Hi Vinay,
> >
> > I'm currently calling commitSync(Map<TopicPartition,
> > OffsetAndMetadata>
> > offsets) after each message to just write the partition offset for
> > that specific message.  Our messages can take several seconds to
> > process and this only seems to be adding 1 or 2 milliseconds to the
> > time so is not looking like a significant overhead and is acting as our
> heartbeat.
> >
> > WRT " I see that kafka sends a metadata refresh request after every
> > 300000 ms (default) and even though nothing changed (no new consumer,
> > broker or
> > partition) this refresh generally triggers a rebalance  (at least in
> > my
> > tests) ."
> >
> > I'm not seeing this. We've got a ConsumerRebalanceListener implemented
> > on our consumers and I don't see this this get called even though I
> > see lots of metadata request being sent. We can also have quiet
> > periods where we often exceed the 300000 ms refresh default and those
> > metadata requests don't trigger a rebalance either.
> >
> > I'm calling consumer.pause(consumer.assignment().toArray(new
> > TopicPartition[0])) at the start of each batch and
> > consumer.resume(consumer.assignment().toArray(new TopicPartition[0]))
> > the end. This allows us to call poll(0) in the message loop if we need
> > to block on a message for more than session.timeout.ms ( this can
> > happen if an external system is temporarily unavailable). Again this
> > seems to work ok and does not trigger a rebalance.
> >
> > The only issue we've found is as mentioned before where a rebalance
> > occurs while we are processing a batch of messages. When that happens
> > the commitSync fails with a KafkaException and the message states this
> > is due to a rebalance. We'd like to skip the rest of the batch when
> > this happens but to do that we'd need to know for sure that it was
> > because of a rebalance but KafkaException could be called for other
> > reasons. A KafkaRebalanceException or even a method we could call on
> > the consumer would allow us to safely abort the current processing
> > loop knowing that the remaining messages would be picked up by another
> > consumer after the rebalance - that would stop us processing duplicates.
> >
> > Thanks
> > Phil Luckhurst
> >
> >
> > -----Original Message-----
> > From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> > Sent: 22 April 2016 14:24
> > To: users@kafka.apache.org
> > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > (0.9.0.1)
> >
> > Hi Phil,
> >
> > Regarding pause and resume,I have not tried this approach but i think
> > this approach may not be feasible. If your consumer no longer has that
> > partition assigned from which record being processed was fetched or
> > even if partition is assigned again to consumer somehow you may still
> > not be able to do this and see UNKNOWN_MEMBER_ID or error
> > ILLEGAL_GENERATION_ID in logs.Let me know if this approach works for
> you. i will also try this then.
> >
> > Kafka never sends same record to 2 consumers in same consumer group.
> > If another consumer got the same record then it means that first
> > consumer doesn't has that particular partition assigned to it anymore.
> > Even if you want to commit from first consumer you will not be able to
> > and commit will throw exception. Even after increasing sessionTimeOut
> > a rebalance can still occur. I see that kafka sends a metadata refresh
> > request after every 300000 ms (default) and even though nothing
> > changed (no new consumer, broker or
> > partition) this refresh generally triggers a rebalance  (atleast in my
> > tests) .
> >
> > Calling commitSync renews session and keeps consumer alive. commitSync
> > is a blocking operation so you may not want to call it on each record
> > processing. You can try calling commitSync(Offset) just before
> > starting to process a record only if lets say 75% of configured
> > session time is elapsed. This will keep your consumer alive during
> > rare longer processing time and will also not commit each record. But
> > as i said earlier this will not guarantee that rebalance will not
> > happen. A metadata refresh or other change may still trigger rebalance
> > but if you take above approach then atleast a rebalance will not occur
> > because of session time out during a longer processing time.
> >
> > if you maintain offsets outside of kafka and configure consumers to
> > coordinate with each other through these external offsets then you can
> > skip processing duplicate records even if kafka sends same records twice.
> > Through these external offsets you will have to device a way to skip a
> > record if already processed by another consumer or wait if same record
> > is in process by another consumer.
> >
> >
> > Regards,
> > Vinay Sharma
> >
> >
> >
> > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <
> > phil.luckhurst@encycle.com>
> > wrote:
> >
> > > Thanks for all the responses. Unfortunately it seems that currently
> > > there is no fool proof solution to this. It's not a problem with the
> > > stored offsets as it will happen even if I do a commitSync after
> > > each record is processed. It's the unprocessed records in the batch
> > > that get processed twice.
> > >
> > > I'm now taking the approach of trying to limit the possibility of a
> > > rebalance as much as possible by reducing the data returned by poll.
> > > I'm also using the pause, poll, resume pattern to ensure the
> > > consumer doesn't cause a rebalance if the processing loop takes
> > > longer than session.timeout.ms.
> > >
> > > Cheers,
> > > Phil
> > >
> > >
> > >
> > > On 21 Apr 2016 16:24, at 16:24, vinay sharma
> > > <vinsharma.tech@gmail.com>
> > > wrote:
> > > >Hi,
> > > >
> > > >By design Kafka does ensure not to send same record to multiple
> > > >consumers in same consumer group. Issue is because of rebalance
> > > >while a processing is going on and records are not yet commited. In
> > > >my view there are only 2 possible solutions to it
> > > >1) As mentioned in documentation, store offsets outside of kafka (
> > > >
> > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/cli
> > > en
> > > ts/consumer/KafkaConsumer.html
> > > ).
> > > >This is a complete solution but will definitely add extra
> > > >developement and also extra processing to each message. Problem may
> > > >still exist if at the time of a crash consumer was out of sync from
> > > >external custom offset storage and offsets stored in kafka both.
> > > >2)  As mentioned in fix for defect 919 (
> > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
> > > >true.
> > > >This will make kafka commit fetched records before rebalancing.
> > > >Only drawback is that some records may never be processed if
> > > >consumer crashes while processing records which are already marked
> > > >committed due to rebalance.
> > > >
> > > >Regards,
> > > >Vinay Sharma
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message