nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject RE: Writing to a flowfile
Date Mon, 17 Aug 2015 00:22:29 GMT
Dave,

Not a problem. I do remember a note from you about making those processors able to communicate
with AMQP and wanting to contribute that back, which is great! I don't remember seeing a link
to the code, though. I will have to go back and check again.

If I can find that link, I can try to give some more specific advice, but in general when
I create a processor that interacts with an external endpoint, I will create a method that
returns the client that I am using. It's often something super simple like:

protected Client getClient() {
    return client;
}

This way, in my unit test I can simply create a subclass that is able to returned a mocked
out client:

final TestRunner runner = TestRunners.newTestRunner(new MyProcessor() {
    protected Client getClient() {
        return new MockClient();
    }
});

This way, I can easily mock the client out to do whatever I need, including things like throwing
IOException to ensure that it is handled properly.

Again, I'll try to find that link and offer more specific pointers if I can - if you have
the link handy that would be good, in case I'm not able to find the link
that you sent last.

Thanks
-Mark

----------------------------------------
> Date: Sun, 16 Aug 2015 19:01:28 +0000
> From: davidrsmith@btinternet.com
> To: dev@nifi.apache.org
> Subject: Re: Writing to a flowfile
>
> Mark
> Thanks very much for all of your help, that works really well, I have also taken on board
your other comments and implemented them on my home version. I will use it all at work tomorrow.
> As you may have seen on a post I made in July, I have taken the put & get JMS processors
and made a modified version for using with an AMQP broker. They appear to work well and my
boss (John Thorp) would like me to contribute them back to org.apache.nifi.
> Before I can do that I need to write some Junit tests, but I have no idea how to mock
an AMQP broker/queue.To contribute the code for consideration do I need to create my own branch
in the development code, insert my code and then push it back up. Currently my code is on
github (link in July posts) .
> Thanks again for your helpDave
>
>
> On Saturday, 15 August 2015, 22:39, Mark Payne <markap14@hotmail.com> wrote:
>
>
> Dave,
>
> Not a problem.
>
> The FlowFile object itself is immutable. If you want to modify the FlowFile, you do so
by asking
> the session to give you a new version of the FlowFile with some update. For instance,
by adding
> an attribute or changing the content of the FlowFile.
>
> So any call to session.putAttribute or session.write returns a new FlowFile. If you update
> the line that calls putAttribute so that it stores the returned FlowFile into your 'parsed'
variable,
> you should be good to go.
>
> So you would do:
>
> FlowFile parsed = session.create(original);
> parsed = session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());
>
> Otherwise, you end up trying to modify the same version twice (once when you call session.putAttribute
and
> again when you call session.write). This is what the message is complaining about.
>
> Just looking through the code, a few other comments that I would offer:
>
> * the "static boolean error = false;" is likely to cause problems. All instances of your
processor would get the same 'error' variable.
> I would recommend you use an org.apache.nifi.util.BooleanHolder object (defined in the
nifi-utils module) and define
> it within your onTrigger method, rather than using a member variable.
>
> * Experience has shown that with any log message, you should log the FlowFIle that you
are referring to. You can
> also parameterize your log messages. For example:
>
> logger.error("Failed to parse {}; routing to failure", new Object[] {original});
>
> rather than
>
> logger.error("parsing to failure");
>
>
> I hope this helps! Let us know if you're still having problems!
>
> Thanks
> -Mark
>
> ________________________________
>> Date: Sat, 15 Aug 2015 19:41:00 +0000
>> From: davidrsmith@btinternet.com
>> To: dev@nifi.apache.org
>> Subject: Re: Writing to a flowfile
>>
>> Mark
>>
>> Thanks for your help. I have used the snippet of code you sent and it
>> works although I am fairly sure I haven't implemented it correctly, I
>> have had to put all of my code in the OnTrigger method, instead of in
>> the the callback.
>> I also need to change the filename attribute of the parsed flowfile, I
>> have inserted the following line:
>>
>> session.putAttribute(parsed, CoreAttributes.FILENAME.key(),
>> context.getProperty(PARSED_FILENAME).getValue());
>>
>> But it gives me the following error:
>> 2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5]
>> o.a.nifi.processors.standard.ParseMyData
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
>> ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>> StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=21562])
>>
>>
>> I have attached my processor class, I would be grateful if you could
>> give it a quick look and tell me what I have done wrong.
>>
>> Many thanks
>> Dave
>>
>>
>>
>> On Saturday, 15 August 2015, 13:16, Mark Payne <markap14@hotmail.com> wrote:
>>
>>
>> David,
>>
>> In this case, since you want to keep the original intact, you will need
>> to create a 'child' flowfile to write to.
>> You do this with ProcessSession.create(FlowFile)
>>
>> So you will have code that looks something like this:
>>
>> final FlowFile original = session.get();
>> if (original == null) {
>> return;
>> }
>>
>> // create a new 'child' FlowFile. The framework will automatically handle
>> // the provenance information so that 'parsed' is forked from 'original'.
>> FlowFile parsed = session.create(original);
>>
>> // Get an OutputStream for the 'parsed' FlowFile
>> parsed = session.write(parsed, new OutputStreamCallback() {
>> public void process(OutputStream parsedOut) {
>>
>> // Get an InputStream for the original
>> session.read(original, new InputStreamCallback() {
>> public void process(InputStream originalIn) {
>> // read from original FlowFile via originalIn
>> // write to new FlowFile via parsedOut
>> }
>> });
>>
>> }
>> });
>>
>> Does this give you what you need? If anything is still unclear, let us know!
>>
>> Thanks
>> -Mark
>>
>> ----------------------------------------
>>> Date: Sat, 15 Aug 2015 10:04:54 +0100
>>> From: davidrsmith@btinternet.com<mailto:davidrsmith@btinternet.com>
>>> Subject: Writing to a flowfile
>>> To: dev@nifi.apache.org<mailto:dev@nifi.apache.org>
>>>
>>>
>>> Hi
>>>
>>> I'm writing a processor which parses a file, I want the parsed file
>> to go to relationship parsed, and the original file to go to
>> relationship original, if the parse was ok.
>>> If the parse fails I want the original file to go to relationship failure.
>>>
>>> I have an inner class which contains a callback which does the
>> parsing. The callback is called from the onTrigger method.
>>> My problem is that I want to read from my original flowFile and write
>> to a new flowFile, but it always seems to write to the original
>> flowfile.
>>> How do I direct my bufferedwriter to my new flowfile?
>>>
>>> Many thanks
>>> Dave
>>>
>>> Sent from Yahoo! Mail on Android
>>>
>>
>>
>>
>
>
>
 		 	   		  
Mime
View raw message