nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nick Carenza <nick.care...@thecontrolgroup.com>
Subject Re: ConsumeKafka processor erroring when held up by full queue
Date Thu, 09 Feb 2017 18:19:49 GMT
That makes perfect sense. To be clear, is there any potential to lose
messages in this scenario?

On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <joe.witt@gmail.com> wrote:

> yeah this is probably a good case/cause for use of the pause concept
> in kafka consumers.
>
> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <bbende@gmail.com> wrote:
> > I believe you are running into this issue:
> >
> > https://issues.apache.org/jira/browse/NIFI-3189
> >
> > When back-pressure happens on the queue coming out of ConsumeKafka,
> > this can last for longer than session.timeout.ms, and when the
> > processors resumes executing it receives this error on the first
> > execution. We should be able to implement some type of keep-alive so
> > that even when the processor is not executing, there is a background
> > thread, or some way of keeping the connections alive.
> >
> > I believe any user-defined properties in the processor get passed to
> > the Kafka consumer, so I believe you could add "session.timeout.ms"
> > and set a much higher value as a possible work around.
> >
> > Thanks,
> >
> > Bryan
> >
> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <ijokarumawak@gmail.com>
> wrote:
> >> Hello Nick,
> >>
> >> First, I assume "had a queue back up" means have a queue being
> >> back-pressure. Sorry if that was different meaning.
> >>
> >> I was trying to reproduce by following flow:
> >> ConsumeKafka_0_10
> >>   -- success: Back Pressure Object Threshold = 10
> >>     -- UpdateAttribute (Stopped)
> >>
> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
> >> The result was, when NiFi received 10th messages, the success
> >> relationship back-pressure was enabled.
> >> When I published the 11th message, NiFi didn't do anything.
> >> This is expected behavior because downstream connection is
> >> back-pressured, the processor won't be scheduled.
> >>
> >> After I started UpdateAttribute and the queued flow files went
> >> through, ConsumeKafka was executed again and received the 11th
> >> message.
> >>
> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
> >> those warning and error message is logged because NiFi received
> >> KafkaException when it tried to commit offset to Kafka.
> >>
> >> Were there anything in Kafka server logs? I suspect something had
> >> happened at Kafka server side.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
> >> <nick.carenza@thecontrolgroup.com> wrote:
> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
> without
> >>> problems. I had a queue back up due to a downstream processor and I
> started
> >>> getting these bulletins.
> >>>
> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
> are
> >>> likely as we were able to commit the process session but received an
> >>> exception from Kafka while committing offsets.
> >>>
> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
> while
> >>> interacting with Kafka so will close the lease
> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
> SimpleConsumerLease@87d2ac1
> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
> >>> cannot be completed since the group has already rebalanced and
> assigned the
> >>> partitions to another member. This means that the time between
> subsequent
> >>> calls to poll() was longer than the configured session.timeout.ms,
> which
> >>> typically implies that the poll loop is spending too much time message
> >>> processing. You can address this either by increasing the session
> timeout or
> >>> by reducing the maximum size of batches returned in poll() with
> >>> max.poll.records.
> >>>
> >>> My max.poll.records is set to 10000 on my consumer and
> session.timeout.ms is
> >>> the default 10000 on the server.
> >>>
> >>> Since there is no such thing as coincidences, I believe this has to do
> with
> >>> it not being able to push received messages to the downstream queue.
> >>>
> >>> If my flow is backed up, I expect the ConsumKafka processor not to
> throw
> >>> errors but continue to heartbeat with the Kafka server and resume
> consuming
> >>> once it can commit to the downstream queue?
> >>>
> >>> Might I have the server or consumer misconfigured to handle this
> scenario or
> >>> should the consumer not be throwing this error?
> >>>
> >>> Thanks,
> >>> - Nick
>

Mime
View raw message