nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aldrin Piri <aldrinp...@gmail.com>
Subject Re: NiFI Custom Plugin - Yield Duration Ignored
Date Tue, 17 Nov 2015 19:47:52 GMT
Michael,

In response to your question about the suspension of flow on error there is
not any explicit functionality to this end largely because various other
flows/paths through the system may wish to keep operating even in the face
of an issue within one component.  However, the mechanism NiFi does provide
is the notion of back pressure on any connection [1].

By establishing back pressures, whether by FlowFIle count or data size, one
can prevent additional information flowing into a given connection or
queue.  In your particular example, we may not want to have more than 100
items queued up to this problematic processor.  Once that limit is hit, the
Flow Controller would prevent  any immediately preceding processors in the
flow path from executing and generating additional items from enqueuing.
With back pressure applied through a certain path, it is possible to
prevent the introduction of more data all the way up to the source at which
it is first brought into NiFi.

[1] https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#settings

On Tue, Nov 17, 2015 at 1:22 PM, Michael De Courci <mdecourci@gmail.com>
wrote:

> I have attached the code for you to review, I think it is checking for
> null in a base class ErrorHandlerAdapterProcessor
> It is written in Scala, but not that different from a Java Implementation;
>
>
> Michael
>
>
>
>
>
>
>
> > On 17 Nov 2015, at 18:16, Mark Payne <markap14@hotmail.com> wrote:
> >
> > Michael,
> >
> > Not a problem. It looks like you are calling session.read() but passing
> it a null FlowFile.
> >
> > My guess is that in your processor you're doing something like this:
> >
> > FlowFile flowFile = session.get();
> > ...
> > session.read(flowFile, new InputStreamCallback() {...});
> >
> > It's important to always check if the FlowFile returned from
> session.get() is null or not. Because
> > NiFi allows for multi-threading and allows Processors to be run without
> incoming connections,
> > session.get() may well return null.
> >
> > If that is not the case, please let us know, and we will continue to
> help you investigate.
> >
> > Thanks!
> >
> > -Mark
> >
> >
> >> On Nov 17, 2015, at 11:42 AM, Michael De Courci <mdecourci@gmail.com>
> wrote:
> >>
> >> Hi
> >> Thanks for your speedy reply.
> >> I have just done a retest and changed the code to handle exceptions by
> calling context.yield
> >>
> >> and I now get, the following stack trace forever repeating, still my
> custom plugin is never yielded.
> >>
> >> What I am trying to develop is a stable nifi plugin that does not
> consume resource when an exception occurs - Can you help with that also?
> >>
> >> 015-11-17 16:26:50,476 INFO [NiFi Web Server-26]
> c.s.j.s.i.application.WebApplicationImpl Initiating Jersey application,
> version 'Jersey: 1.19 02/11/2015 03:25 AM'
> >> 2015-11-17 16:27:08,462 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@54c2b852 // Another save
> pending = false
> >> 2015-11-17 16:27:17,676 INFO [pool-35-thread-4]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> PutFile[id=5135b041-66ff-43ab-9e71-f1848dcef4f6] to run with 1 threads
> >> 2015-11-17 16:27:17,676 INFO [pool-35-thread-2]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] to run with 1 threads
> >> 2015-11-17 16:27:17,676 INFO [pool-35-thread-3]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] to run
> with 1 threads
> >> 2015-11-17 16:27:17,691 INFO [Timer-Driven Process Thread-2]
> o.a.nifi.processors.standard.GetFile
> GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] added
> StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default,
> section=1], offset=0,
> length=15658],offset=0,name=162700882642440.json,size=15658] to flow
> >> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5]
> c.b.n.p.SampleErrorHandlerProcessor
> [SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]] ---
> Encountered process exception
> >> org.apache.nifi.processor.exception.FlowFileHandlingException: null is
> not known in this session (StandardProcessSession[id=35])
> >>       at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2345)
> ~[nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1773)
> ~[nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.doTrigger(SampleErrorHandlerProcessor.scala:49)
> [poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>       at
> com.barclays.nifi.processor.ErrorHandlerAdapterProcessor$class.onTrigger(ErrorHandlerAdapterProcessor.scala:24)
> ~[poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>       at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:26)
> [poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>       at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> [nifi-api-0.3.0.jar:0.3.0]
> >>       at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_51]
> >>       at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_51]
> >>       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_51]
> >>       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_51]
> >>       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_51]
> >>       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_51]
> >>       at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
> >> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5]
> c.b.n.p.SampleErrorHandlerProcessor
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] failed
> to process due to
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default,
> section=1], offset=0,
> length=15658],offset=0,name=162700882642440.json,size=15658] is not known
> in this session
> >>
> >>
> >>> On 17 Nov 2015, at 16:19, Mark Payne <markap14@hotmail.com> wrote:
> >>>
> >>> Michael,
> >>>
> >>> Hello. First, thanks for your interest in NiFi! I'm very happy to see
> that you're already starting to dig in pretty deep here!
> >>>
> >>> I admit that the wording here can be a little confusing. Let me try to
> explain what's going on here and how yielding
> >>> works. When you are developing a Processor in NiFi, especially if
> reaching out to some external resource,
> >>> a common occurrence is to realize that there is no work that can
> really be done at this point in time (for
> >>> example, because the remote resource is not accessible). In that case,
> the processor can call ProcessContext.yield()
> >>> and as a result, the framework will stop scheduling that processor to
> run for the configured amount of time, so that
> >>> we aren't wasting resources when we know there is nothing that we can
> accomplish. This is where the configured
> >>> Yield Duration comes in.
> >>>
> >>> There is, however, a concept of "administratively yielding" a
> processor. The idea here is that the Processor throws
> >>> some unexpected exception. Since the Exception was not of type
> ProcessException, the framework considers this
> >>> a bug and will perform an administrative yield to avoid continually
> triggering a processor that may be in a bad state.
> >>>
> >>> Perhaps using the term "administratively yielding" was a bad idea, as
> it can clearly lead to confusion.
> >>> If you feel that something is not behaving properly, though, or you
> believe that the user experience can be improved
> >>> in some way, please do let us know, as we strive to continually
> improve the user experience.
> >>>
> >>> Thanks
> >>> -Mark
> >>>
> >>>
> >>>> On Nov 17, 2015, at 10:34 AM, Michael De Courci <mdecourci@gmail.com>
> wrote:
> >>>>
> >>>> FYI
> >>>>
> >>>>> Begin forwarded message:
> >>>>>
> >>>>> From: Michael De Courci <mdecourci@googlemail.com>
> >>>>> Date: 16 November 2015 at 15:41:27 GMT
> >>>>> To: users@nifi.apache.org, Michael.DeCourci@barclayscapital.com,
> Kefah Issa <kefah.issa@barclays.com>
> >>>>> Subject: NiFI Custom Plugin - Yield Duration Ignored
> >>>>>
> >>>>> I wrote a NAR custom plugin to verify NoFi Exception handling. -
> version; nifi-0.3.0
> >>>>>
> >>>>> The plugin was written to always through a RunTimeException for
any
> exception including ProcessException.
> >>>>>
> >>>>> I created a simple flow; GetFile->MyPlugin->PutFile
> >>>>>
> >>>>> and set the Yield duration to 60 seconds.
> >>>>>
> >>>>> The log shows that the yield duration is always 1 sec
> >>>>>
> >>>>> I debugged the NiFi framework code and see that the yield duration
> always defaults to 1 sec;
> >>>>> See;
> >>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent
> >>>>>
> >>>>> Setting the duration updates the class
> >>>>> org.apache.nifi.controller.StandardProcessorNode
> >>>>> but this does not communicate the change to
> TimerDrivenSchedulingAgent
> >>>>>
> >>>>> Evidence from log;
> >>>>>
> >>>>> 80eb-552a-471c-8ffd-f2509d71fd2e] failed to process session due
to
> java.lang.RuntimeException: java.lang.RuntimeException
> >>>>>
> >>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7]
> c.b.n.p.SampleErrorHandlerProcessor
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
> Processor Administratively Yielded for 1 sec due to processing failure
> >>>>>
> >>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] due to
> uncaught Exception: java.lang.RuntimeException
> >>>>>
> >>>>> 2015-11-15 16:17:58,419 WARN [Timer-Driven Process Thread-7]
> o.a.n.c.t.ContinuallyRunProcessorTask
> >>>>>
> >>>>> java.lang.RuntimeException: null
> >>>>>
> >>>>>     at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:103)
> ~[na:na]
> >>>>>
> >>>>>
> >>>>>
> >>>>> With Kind Regards
> >>>>> Michael de Courci
> >>>>
> >>>
> >>
> >
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message