nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Koji Kawamura <ijokaruma...@gmail.com>
Subject Re: ConsumeKafka processor erroring when held up by full queue
Date Thu, 09 Feb 2017 13:42:37 GMT
Hello Nick,

First, I assume "had a queue back up" means have a queue being
back-pressure. Sorry if that was different meaning.

I was trying to reproduce by following flow:
ConsumeKafka_0_10
  -- success: Back Pressure Object Threshold = 10
    -- UpdateAttribute (Stopped)

Then I used ./bin/kafka-console-producer.sh to send 11 messages.
The result was, when NiFi received 10th messages, the success
relationship back-pressure was enabled.
When I published the 11th message, NiFi didn't do anything.
This is expected behavior because downstream connection is
back-pressured, the processor won't be scheduled.

After I started UpdateAttribute and the queued flow files went
through, ConsumeKafka was executed again and received the 11th
message.

Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
those warning and error message is logged because NiFi received
KafkaException when it tried to commit offset to Kafka.

Were there anything in Kafka server logs? I suspect something had
happened at Kafka server side.

Thanks,
Koji

On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
<nick.carenza@thecontrolgroup.com> wrote:
> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
> problems. I had a queue back up due to a downstream processor and I started
> getting these bulletins.
>
> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
> likely as we were able to commit the process session but received an
> exception from Kafka while committing offsets.
>
> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
> interacting with Kafka so will close the lease
> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured session.timeout.ms, which
> typically implies that the poll loop is spending too much time message
> processing. You can address this either by increasing the session timeout or
> by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>
> My max.poll.records is set to 10000 on my consumer and session.timeout.ms is
> the default 10000 on the server.
>
> Since there is no such thing as coincidences, I believe this has to do with
> it not being able to push received messages to the downstream queue.
>
> If my flow is backed up, I expect the ConsumKafka processor not to throw
> errors but continue to heartbeat with the Kafka server and resume consuming
> once it can commit to the downstream queue?
>
> Might I have the server or consumer misconfigured to handle this scenario or
> should the consumer not be throwing this error?
>
> Thanks,
> - Nick

Mime
View raw message