nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bryan Bende <>
Subject Re: ConsumeKafka processor erroring when held up by full queue
Date Thu, 09 Feb 2017 14:49:49 GMT
I believe you are running into this issue:

When back-pressure happens on the queue coming out of ConsumeKafka,
this can last for longer than, and when the
processors resumes executing it receives this error on the first
execution. We should be able to implement some type of keep-alive so
that even when the processor is not executing, there is a background
thread, or some way of keeping the connections alive.

I believe any user-defined properties in the processor get passed to
the Kafka consumer, so I believe you could add ""
and set a much higher value as a possible work around.



On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <> wrote:
> Hello Nick,
> First, I assume "had a queue back up" means have a queue being
> back-pressure. Sorry if that was different meaning.
> I was trying to reproduce by following flow:
> ConsumeKafka_0_10
>   -- success: Back Pressure Object Threshold = 10
>     -- UpdateAttribute (Stopped)
> Then I used ./bin/ to send 11 messages.
> The result was, when NiFi received 10th messages, the success
> relationship back-pressure was enabled.
> When I published the 11th message, NiFi didn't do anything.
> This is expected behavior because downstream connection is
> back-pressured, the processor won't be scheduled.
> After I started UpdateAttribute and the queued flow files went
> through, ConsumeKafka was executed again and received the 11th
> message.
> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
> those warning and error message is logged because NiFi received
> KafkaException when it tried to commit offset to Kafka.
> Were there anything in Kafka server logs? I suspect something had
> happened at Kafka server side.
> Thanks,
> Koji
> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> <> wrote:
>> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
>> problems. I had a queue back up due to a downstream processor and I started
>> getting these bulletins.
>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
>> likely as we were able to commit the process session but received an
>> exception from Kafka while committing offsets.
>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
>> interacting with Kafka so will close the lease
>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> cannot be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured, which
>> typically implies that the poll loop is spending too much time message
>> processing. You can address this either by increasing the session timeout or
>> by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>> My max.poll.records is set to 10000 on my consumer and is
>> the default 10000 on the server.
>> Since there is no such thing as coincidences, I believe this has to do with
>> it not being able to push received messages to the downstream queue.
>> If my flow is backed up, I expect the ConsumKafka processor not to throw
>> errors but continue to heartbeat with the Kafka server and resume consuming
>> once it can commit to the downstream queue?
>> Might I have the server or consumer misconfigured to handle this scenario or
>> should the consumer not be throwing this error?
>> Thanks,
>> - Nick

View raw message