nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Witt <joe.w...@gmail.com>
Subject Re: ConsumeKafka processor erroring when held up by full queue
Date Thu, 09 Feb 2017 15:16:53 GMT
yeah this is probably a good case/cause for use of the pause concept
in kafka consumers.

On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbende@gmail.com> wrote:
> I believe you are running into this issue:
>
> https://issues.apache.org/jira/browse/NIFI-3189
>
> When back-pressure happens on the queue coming out of ConsumeKafka,
> this can last for longer than session.timeout.ms, and when the
> processors resumes executing it receives this error on the first
> execution. We should be able to implement some type of keep-alive so
> that even when the processor is not executing, there is a background
> thread, or some way of keeping the connections alive.
>
> I believe any user-defined properties in the processor get passed to
> the Kafka consumer, so I believe you could add "session.timeout.ms"
> and set a much higher value as a possible work around.
>
> Thanks,
>
> Bryan
>
> On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ijokarumawak@gmail.com> wrote:
>> Hello Nick,
>>
>> First, I assume "had a queue back up" means have a queue being
>> back-pressure. Sorry if that was different meaning.
>>
>> I was trying to reproduce by following flow:
>> ConsumeKafka_0_10
>>   -- success: Back Pressure Object Threshold = 10
>>     -- UpdateAttribute (Stopped)
>>
>> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>> The result was, when NiFi received 10th messages, the success
>> relationship back-pressure was enabled.
>> When I published the 11th message, NiFi didn't do anything.
>> This is expected behavior because downstream connection is
>> back-pressured, the processor won't be scheduled.
>>
>> After I started UpdateAttribute and the queued flow files went
>> through, ConsumeKafka was executed again and received the 11th
>> message.
>>
>> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>> those warning and error message is logged because NiFi received
>> KafkaException when it tried to commit offset to Kafka.
>>
>> Were there anything in Kafka server logs? I suspect something had
>> happened at Kafka server side.
>>
>> Thanks,
>> Koji
>>
>> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>> <nick.carenza@thecontrolgroup.com> wrote:
>>> Hey team, I have a ConsumeKafka_0_10 running which normally operates without
>>> problems. I had a queue back up due to a downstream processor and I started
>>> getting these bulletins.
>>>
>>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates are
>>> likely as we were able to commit the process session but received an
>>> exception from Kafka while committing offsets.
>>>
>>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception while
>>> interacting with Kafka so will close the lease
>>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> cannot be completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured session.timeout.ms, which
>>> typically implies that the poll loop is spending too much time message
>>> processing. You can address this either by increasing the session timeout or
>>> by reducing the maximum size of batches returned in poll() with
>>> max.poll.records.
>>>
>>> My max.poll.records is set to 10000 on my consumer and session.timeout.ms is
>>> the default 10000 on the server.
>>>
>>> Since there is no such thing as coincidences, I believe this has to do with
>>> it not being able to push received messages to the downstream queue.
>>>
>>> If my flow is backed up, I expect the ConsumKafka processor not to throw
>>> errors but continue to heartbeat with the Kafka server and resume consuming
>>> once it can commit to the downstream queue?
>>>
>>> Might I have the server or consumer misconfigured to handle this scenario or
>>> should the consumer not be throwing this error?
>>>
>>> Thanks,
>>> - Nick

Mime
View raw message