kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jaikiran Pai <jai.forums2...@gmail.com>
Subject Re: 0.10.1.0 - commitSync() doesn't contribute to "aliveness" of a consumer?
Date Tue, 01 Nov 2016 14:56:16 GMT
For reference here's teh complete stacktrace (it's triggered when 
commitSync gets called):

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499)

~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.KafkaConsumer.*commitSync*(KafkaConsumer.java:1104) 
~[kafka-clients-0.10.1.0.jar!/:na]



-Jaikiran
On Tuesday 01 November 2016 07:39 PM, Jaikiran Pai wrote:
> We are using Kafka 0.10.1.0 (server) and Java client API (the new API) 
> for consumers. One of the issues we have been running into is that the 
> consumer is considered "dead" by the co-ordinator because of the lack 
> of activity within a specific period of time. In reality, the consumer 
> is still alive. We see exceptions like these:
>
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
> be completed since the group has already rebalanced and assigned the 
> partitions to another member. This means that the time between 
> subsequent calls to poll() was longer than the configured 
> max.poll.interval.ms, which typically implies that the poll loop is 
> spending too much time message processing. You can address this either 
> by increasing the session timeout or by reducing the maximum size of 
> batches returned in poll() with max.poll.records.
>
>
> I understand what that exception means and what we could potentially 
> do to address that (setting a low value for max.poll.records is one 
> option). Before changing the max.poll.records value in our setup, I 
> would like to hear/understand a bit more about this so that I know 
> this is a right way to fix in the way we have implemented our 
> consumers. Essentially, our consumer code is this:
>
>             while (!stopped) {
>                 try {
>                     final ConsumerRecords<K, V> consumerRecords = 
> consumer.poll(someValue);
>                     for (final TopicPartition topicPartition : 
> consumerRecords.partitions()) {
>                         if (stopped) {
>                             break;
>                         }
>                         for (final ConsumerRecord<K, V> consumerRecord 
> : consumerRecords.records(topicPartition)) {
>                             final long previousOffset = 
> consumerRecord.offset();
>                             // commit the offset and then pass on the 
> message for processing (in a separate thread)
> consumer.commitSync(Collections.singletonMap(topicPartition, new 
> OffsetAndMetadata(previousOffset + 1)));
>
>                             this.executor.execute(new Runnable() {
>                                 @Override
>                                 public void run() {
>                                     // process the ConsumerRecord
>                                 }
>                             });
>                         }
>                     }
>                 } catch (Exception e) {
>                     // log the error and continue
>                     continue;
>                 }
>             }
>
>
>
> As you can see the only thing that happens in the main thread which 
> the consumer is polling on is - commitSync for each record that was 
> returned in that batch of poll. I understand commitSync is blocking, 
> so potentially this can lead to each commitSync invocation adding up 
> to the time between each poll(). One option is using commitAsync, but 
> we need to evaluate if it has other issues within our usecase.
>
> But what I was wondering was, why doesn't commitSync contribute to the 
> logic of the consumer being alive? If it did, then I see no reason why 
> this consumer will ever be considered dead and that above message 
> logged. Anyone see a problem with the code above?
>
> P.S: We use the default session timeout value in the consumer configs 
> (i.e. we don't set any specific value)
>
>
> -Jaikiran


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message