nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael De Courci <mdecou...@googlemail.com>
Subject Re: NiFI Custom Plugin - Yield Duration Ignored
Date Thu, 19 Nov 2015 13:29:39 GMT
Hi
Attachments below


With Kind Regards
Michael de Courci

On 17 November 2015 at 22:40, Mark Payne <markap14@hotmail.com> wrote:

> Michael,
>
> I would be happy to take a look. It doesn't look like any attachment came
> through, though. It's quite possible
> that the Apache mail server stripped the attachment. Could you try pasting
> the code into the email? That may be
> the simplest approach. Or alternatively using something like pastebin.
>
> Thanks
> -Mark
>
>
> > On Nov 17, 2015, at 1:22 PM, Michael De Courci <mdecourci@gmail.com>
> wrote:
> >
> > I have attached the code for you to review, I think it is checking for
> null in a base class ErrorHandlerAdapterProcessor
> > It is written in Scala, but not that different from a Java
> Implementation;
> >
> >
> > Michael
> >
> >
> >
> >
> >
> >
> >> On 17 Nov 2015, at 18:16, Mark Payne <markap14@hotmail.com> wrote:
> >>
> >> Michael,
> >>
> >> Not a problem. It looks like you are calling session.read() but passing
> it a null FlowFile.
> >>
> >> My guess is that in your processor you're doing something like this:
> >>
> >> FlowFile flowFile = session.get();
> >> ...
> >> session.read(flowFile, new InputStreamCallback() {...});
> >>
> >> It's important to always check if the FlowFile returned from
> session.get() is null or not. Because
> >> NiFi allows for multi-threading and allows Processors to be run without
> incoming connections,
> >> session.get() may well return null.
> >>
> >> If that is not the case, please let us know, and we will continue to
> help you investigate.
> >>
> >> Thanks!
> >>
> >> -Mark
> >>
> >>
> >>> On Nov 17, 2015, at 11:42 AM, Michael De Courci <mdecourci@gmail.com>
> wrote:
> >>>
> >>> Hi
> >>> Thanks for your speedy reply.
> >>> I have just done a retest and changed the code to handle exceptions by
> calling context.yield
> >>>
> >>> and I now get, the following stack trace forever repeating, still my
> custom plugin is never yielded.
> >>>
> >>> What I am trying to develop is a stable nifi plugin that does not
> consume resource when an exception occurs - Can you help with that also?
> >>>
> >>> 015-11-17 16:26:50,476 INFO [NiFi Web Server-26]
> c.s.j.s.i.application.WebApplicationImpl Initiating Jersey application,
> version 'Jersey: 1.19 02/11/2015 03:25 AM'
> >>> 2015-11-17 16:27:08,462 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@54c2b852 // Another save
> pending = false
> >>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-4]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> PutFile[id=5135b041-66ff-43ab-9e71-f1848dcef4f6] to run with 1 threads
> >>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-2]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] to run with 1 threads
> >>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-3]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] to run
> with 1 threads
> >>> 2015-11-17 16:27:17,691 INFO [Timer-Driven Process Thread-2]
> o.a.nifi.processors.standard.GetFile
> GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] added
> StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default,
> section=1], offset=0,
> length=15658],offset=0,name=162700882642440.json,size=15658] to flow
> >>> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5]
> c.b.n.p.SampleErrorHandlerProcessor
> [SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]] ---
> Encountered process exception
> >>> org.apache.nifi.processor.exception.FlowFileHandlingException: null is
> not known in this session (StandardProcessSession[id=35])
> >>>      at
> org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2345)
> ~[nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1773)
> ~[nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.doTrigger(SampleErrorHandlerProcessor.scala:49)
> [poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>>      at
> com.barclays.nifi.processor.ErrorHandlerAdapterProcessor$class.onTrigger(ErrorHandlerAdapterProcessor.scala:24)
> ~[poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>>      at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:26)
> [poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
> >>>      at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> [nifi-api-0.3.0.jar:0.3.0]
> >>>      at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
> [nifi-framework-core-0.3.0.jar:0.3.0]
> >>>      at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_51]
> >>>      at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [na:1.8.0_51]
> >>>      at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [na:1.8.0_51]
> >>>      at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [na:1.8.0_51]
> >>>      at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_51]
> >>>      at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_51]
> >>>      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
> >>> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5]
> c.b.n.p.SampleErrorHandlerProcessor
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] failed
> to process due to
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default,
> section=1], offset=0,
> length=15658],offset=0,name=162700882642440.json,size=15658] is not known
> in this session
> >>>
> >>>
> >>>> On 17 Nov 2015, at 16:19, Mark Payne <markap14@hotmail.com> wrote:
> >>>>
> >>>> Michael,
> >>>>
> >>>> Hello. First, thanks for your interest in NiFi! I'm very happy to see
> that you're already starting to dig in pretty deep here!
> >>>>
> >>>> I admit that the wording here can be a little confusing. Let me try
> to explain what's going on here and how yielding
> >>>> works. When you are developing a Processor in NiFi, especially if
> reaching out to some external resource,
> >>>> a common occurrence is to realize that there is no work that can
> really be done at this point in time (for
> >>>> example, because the remote resource is not accessible). In that
> case, the processor can call ProcessContext.yield()
> >>>> and as a result, the framework will stop scheduling that processor to
> run for the configured amount of time, so that
> >>>> we aren't wasting resources when we know there is nothing that we can
> accomplish. This is where the configured
> >>>> Yield Duration comes in.
> >>>>
> >>>> There is, however, a concept of "administratively yielding" a
> processor. The idea here is that the Processor throws
> >>>> some unexpected exception. Since the Exception was not of type
> ProcessException, the framework considers this
> >>>> a bug and will perform an administrative yield to avoid continually
> triggering a processor that may be in a bad state.
> >>>>
> >>>> Perhaps using the term "administratively yielding" was a bad idea, as
> it can clearly lead to confusion.
> >>>> If you feel that something is not behaving properly, though, or you
> believe that the user experience can be improved
> >>>> in some way, please do let us know, as we strive to continually
> improve the user experience.
> >>>>
> >>>> Thanks
> >>>> -Mark
> >>>>
> >>>>
> >>>>> On Nov 17, 2015, at 10:34 AM, Michael De Courci <mdecourci@gmail.com>
> wrote:
> >>>>>
> >>>>> FYI
> >>>>>
> >>>>>> Begin forwarded message:
> >>>>>>
> >>>>>> From: Michael De Courci <mdecourci@googlemail.com>
> >>>>>> Date: 16 November 2015 at 15:41:27 GMT
> >>>>>> To: users@nifi.apache.org, Michael.DeCourci@barclayscapital.com,
> Kefah Issa <kefah.issa@barclays.com>
> >>>>>> Subject: NiFI Custom Plugin - Yield Duration Ignored
> >>>>>>
> >>>>>> I wrote a NAR custom plugin to verify NoFi Exception handling.
-
> version; nifi-0.3.0
> >>>>>>
> >>>>>> The plugin was written to always through a RunTimeException
for any
> exception including ProcessException.
> >>>>>>
> >>>>>> I created a simple flow; GetFile->MyPlugin->PutFile
> >>>>>>
> >>>>>> and set the Yield duration to 60 seconds.
> >>>>>>
> >>>>>> The log shows that the yield duration is always 1 sec
> >>>>>>
> >>>>>> I debugged the NiFi framework code and see that the yield duration
> always defaults to 1 sec;
> >>>>>> See;
> >>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent
> >>>>>>
> >>>>>> Setting the duration updates the class
> >>>>>> org.apache.nifi.controller.StandardProcessorNode
> >>>>>> but this does not communicate the change to
> TimerDrivenSchedulingAgent
> >>>>>>
> >>>>>> Evidence from log;
> >>>>>>
> >>>>>> 80eb-552a-471c-8ffd-f2509d71fd2e] failed to process session
due to
> java.lang.RuntimeException: java.lang.RuntimeException
> >>>>>>
> >>>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7]
> c.b.n.p.SampleErrorHandlerProcessor
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
> Processor Administratively Yielded for 1 sec due to processing failure
> >>>>>>
> >>>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7]
> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
> SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] due to
> uncaught Exception: java.lang.RuntimeException
> >>>>>>
> >>>>>> 2015-11-15 16:17:58,419 WARN [Timer-Driven Process Thread-7]
> o.a.n.c.t.ContinuallyRunProcessorTask
> >>>>>>
> >>>>>> java.lang.RuntimeException: null
> >>>>>>
> >>>>>>    at
> com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:103)
> ~[na:na]
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> With Kind Regards
> >>>>>> Michael de Courci
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message