kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaikiran Pai <jai.forums2...@gmail.com>
Subject 0.10.1.0 - commitSync() doesn't contribute to "aliveness" of a consumer?
Date Tue, 01 Nov 2016 14:09:14 GMT
We are using Kafka 0.10.1.0 (server) and Java client API (the new API) 
for consumers. One of the issues we have been running into is that the 
consumer is considered "dead" by the co-ordinator because of the lack of 
activity within a specific period of time. In reality, the consumer is 
still alive. We see exceptions like these:


org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.


I understand what that exception means and what we could potentially do 
to address that (setting a low value for max.poll.records is one 
option). Before changing the max.poll.records value in our setup, I 
would like to hear/understand a bit more about this so that I know this 
is a right way to fix in the way we have implemented our consumers. 
Essentially, our consumer code is this:

             while (!stopped) {
                 try {
                     final ConsumerRecords<K, V> consumerRecords = 
consumer.poll(someValue);
                     for (final TopicPartition topicPartition : 
consumerRecords.partitions()) {
                         if (stopped) {
                             break;
                         }
                         for (final ConsumerRecord<K, V> consumerRecord 
: consumerRecords.records(topicPartition)) {
                             final long previousOffset = 
consumerRecord.offset();
                             // commit the offset and then pass on the 
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(previousOffset + 1)));

                             this.executor.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     // process the ConsumerRecord
                                 }
                             });
                         }
                     }
                 } catch (Exception e) {
                     // log the error and continue
                     continue;
                 }
             }



As you can see the only thing that happens in the main thread which the 
consumer is polling on is - commitSync for each record that was returned 
in that batch of poll. I understand commitSync is blocking, so 
potentially this can lead to each commitSync invocation adding up to the 
time between each poll(). One option is using commitAsync, but we need 
to evaluate if it has other issues within our usecase.

But what I was wondering was, why doesn't commitSync contribute to the 
logic of the consumer being alive? If it did, then I see no reason why 
this consumer will ever be considered dead and that above message 
logged. Anyone see a problem with the code above?

P.S: We use the default session timeout value in the consumer configs 
(i.e. we don't set any specific value)


-Jaikiran

Mime
View raw message