nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: NiFI Custom Plugin - Yield Duration Ignored
Date Tue, 17 Nov 2015 22:40:04 GMT
Michael,

I would be happy to take a look. It doesn't look like any attachment came through, though.
It's quite possible
that the Apache mail server stripped the attachment. Could you try pasting the code into the
email? That may be
the simplest approach. Or alternatively using something like pastebin.

Thanks
-Mark


> On Nov 17, 2015, at 1:22 PM, Michael De Courci <mdecourci@gmail.com> wrote:
> 
> I have attached the code for you to review, I think it is checking for null in a base
class ErrorHandlerAdapterProcessor
> It is written in Scala, but not that different from a Java Implementation;
> 
> 
> Michael
> 
> 
> 
> 
> 
> 
>> On 17 Nov 2015, at 18:16, Mark Payne <markap14@hotmail.com> wrote:
>> 
>> Michael,
>> 
>> Not a problem. It looks like you are calling session.read() but passing it a null
FlowFile.
>> 
>> My guess is that in your processor you're doing something like this:
>> 
>> FlowFile flowFile = session.get();
>> ...
>> session.read(flowFile, new InputStreamCallback() {...});
>> 
>> It's important to always check if the FlowFile returned from session.get() is null
or not. Because
>> NiFi allows for multi-threading and allows Processors to be run without incoming
connections,
>> session.get() may well return null.
>> 
>> If that is not the case, please let us know, and we will continue to help you investigate.
>> 
>> Thanks!
>> 
>> -Mark
>> 
>> 
>>> On Nov 17, 2015, at 11:42 AM, Michael De Courci <mdecourci@gmail.com> wrote:
>>> 
>>> Hi
>>> Thanks for your speedy reply.
>>> I have just done a retest and changed the code to handle exceptions by calling
context.yield
>>> 
>>> and I now get, the following stack trace forever repeating, still my custom plugin
is never yielded.
>>> 
>>> What I am trying to develop is a stable nifi plugin that does not consume resource
when an exception occurs - Can you help with that also?
>>> 
>>> 015-11-17 16:26:50,476 INFO [NiFi Web Server-26] c.s.j.s.i.application.WebApplicationImpl
Initiating Jersey application, version 'Jersey: 1.19 02/11/2015 03:25 AM'
>>> 2015-11-17 16:27:08,462 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService
Saved flow controller org.apache.nifi.controller.FlowController@54c2b852 // Another save pending
= false
>>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-4] o.a.n.c.s.TimerDrivenSchedulingAgent
Scheduled PutFile[id=5135b041-66ff-43ab-9e71-f1848dcef4f6] to run with 1 threads
>>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent
Scheduled GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] to run with 1 threads
>>> 2015-11-17 16:27:17,676 INFO [pool-35-thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent
Scheduled SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] to run with
1 threads
>>> 2015-11-17 16:27:17,691 INFO [Timer-Driven Process Thread-2] o.a.nifi.processors.standard.GetFile
GetFile[id=2886b8ff-1f82-4aa3-b42d-1f4f4c9f03f8] added StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default, section=1], offset=0,
length=15658],offset=0,name=162700882642440.json,size=15658] to flow
>>> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5] c.b.n.p.SampleErrorHandlerProcessor
[SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]] --- Encountered process
exception
>>> org.apache.nifi.processor.exception.FlowFileHandlingException: null is not known
in this session (StandardProcessSession[id=35])
>>>      at org.apache.nifi.controller.repository.StandardProcessSession.validateRecordState(StandardProcessSession.java:2345)
~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1773)
~[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at com.barclays.nifi.processor.SampleErrorHandlerProcessor.doTrigger(SampleErrorHandlerProcessor.scala:49)
[poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
>>>      at com.barclays.nifi.processor.ErrorHandlerAdapterProcessor$class.onTrigger(ErrorHandlerAdapterProcessor.scala:24)
~[poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
>>>      at com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:26)
[poc-1.0-SNAPSHOT-nifiPlugins.nar-unpacked/:na]
>>>      at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
[nifi-api-0.3.0.jar:0.3.0]
>>>      at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
[nifi-framework-core-0.3.0.jar:0.3.0]
>>>      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_51]
>>>      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_51]
>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_51]
>>>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_51]
>>>      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_51]
>>>      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_51]
>>>      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
>>> 2015-11-17 16:27:17,743 ERROR [Timer-Driven Process Thread-5] c.b.n.p.SampleErrorHandlerProcessor
SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=6a1800eb-03ca-4aad-9e2c-f96df41252e6,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1447777637680-1, container=default, section=1], offset=0,
length=15658],offset=0,name=162700882642440.json,size=15658] is not known in this session

>>> 
>>> 
>>>> On 17 Nov 2015, at 16:19, Mark Payne <markap14@hotmail.com> wrote:
>>>> 
>>>> Michael,
>>>> 
>>>> Hello. First, thanks for your interest in NiFi! I'm very happy to see that
you're already starting to dig in pretty deep here!
>>>> 
>>>> I admit that the wording here can be a little confusing. Let me try to explain
what's going on here and how yielding
>>>> works. When you are developing a Processor in NiFi, especially if reaching
out to some external resource,
>>>> a common occurrence is to realize that there is no work that can really be
done at this point in time (for
>>>> example, because the remote resource is not accessible). In that case, the
processor can call ProcessContext.yield()
>>>> and as a result, the framework will stop scheduling that processor to run
for the configured amount of time, so that
>>>> we aren't wasting resources when we know there is nothing that we can accomplish.
This is where the configured
>>>> Yield Duration comes in.
>>>> 
>>>> There is, however, a concept of "administratively yielding" a processor.
The idea here is that the Processor throws
>>>> some unexpected exception. Since the Exception was not of type ProcessException,
the framework considers this
>>>> a bug and will perform an administrative yield to avoid continually triggering
a processor that may be in a bad state.
>>>> 
>>>> Perhaps using the term "administratively yielding" was a bad idea, as it
can clearly lead to confusion.
>>>> If you feel that something is not behaving properly, though, or you believe
that the user experience can be improved
>>>> in some way, please do let us know, as we strive to continually improve the
user experience.
>>>> 
>>>> Thanks
>>>> -Mark
>>>> 
>>>> 
>>>>> On Nov 17, 2015, at 10:34 AM, Michael De Courci <mdecourci@gmail.com>
wrote:
>>>>> 
>>>>> FYI
>>>>> 
>>>>>> Begin forwarded message:
>>>>>> 
>>>>>> From: Michael De Courci <mdecourci@googlemail.com>
>>>>>> Date: 16 November 2015 at 15:41:27 GMT
>>>>>> To: users@nifi.apache.org, Michael.DeCourci@barclayscapital.com,
Kefah Issa <kefah.issa@barclays.com>
>>>>>> Subject: NiFI Custom Plugin - Yield Duration Ignored
>>>>>> 
>>>>>> I wrote a NAR custom plugin to verify NoFi Exception handling. -
version; nifi-0.3.0
>>>>>> 
>>>>>> The plugin was written to always through a RunTimeException for any
exception including ProcessException.
>>>>>> 
>>>>>> I created a simple flow; GetFile->MyPlugin->PutFile 
>>>>>> 
>>>>>> and set the Yield duration to 60 seconds.
>>>>>> 
>>>>>> The log shows that the yield duration is always 1 sec
>>>>>> 
>>>>>> I debugged the NiFi framework code and see that the yield duration
always defaults to 1 sec;
>>>>>> See;
>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent
>>>>>> 
>>>>>> Setting the duration updates the class 
>>>>>> org.apache.nifi.controller.StandardProcessorNode
>>>>>> but this does not communicate the change to TimerDrivenSchedulingAgent
>>>>>> 
>>>>>> Evidence from log;
>>>>>> 
>>>>>> 80eb-552a-471c-8ffd-f2509d71fd2e] failed to process session due to
java.lang.RuntimeException: java.lang.RuntimeException
>>>>>> 
>>>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7] c.b.n.p.SampleErrorHandlerProcessor
SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e] Processor Administratively
Yielded for 1 sec due to processing failure
>>>>>> 
>>>>>> 2015-11-15 16:17:58,417 WARN [Timer-Driven Process Thread-7] o.a.n.c.t.ContinuallyRunProcessorTask
Administratively Yielding SampleErrorHandlerProcessor[id=8e8f80eb-552a-471c-8ffd-f2509d71fd2e]
due to uncaught Exception: java.lang.RuntimeException
>>>>>> 
>>>>>> 2015-11-15 16:17:58,419 WARN [Timer-Driven Process Thread-7] o.a.n.c.t.ContinuallyRunProcessorTask
>>>>>> 
>>>>>> java.lang.RuntimeException: null
>>>>>> 
>>>>>>    at com.barclays.nifi.processor.SampleErrorHandlerProcessor.onTrigger(SampleErrorHandlerProcessor.scala:103)
~[na:na]
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> With Kind Regards
>>>>>> Michael de Courci
>>>>> 
>>>> 
>>> 
>> 
> 


Mime
View raw message