nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DAVID SMITH <davidrsm...@btinternet.com>
Subject Re: Writing to a flowfile
Date Sat, 15 Aug 2015 19:41:00 GMT
Mark
Thanks for your help. I have used the snippet of code you sent and it works although I am
fairly sure I haven't implemented it correctly, I have had to put all of my code in the OnTrigger
method, instead of in the the callback.I also need to change the filename attribute of the
parsed flowfile, I have inserted the following line:
session.putAttribute(parsed, CoreAttributes.FILENAME.key(), context.getProperty(PARSED_FILENAME).getValue());

But it gives me the following error:2015-08-15 21:28:55,628 ERROR [Timer-Driven Process Thread-5]
o.a.nifi.processors.standard.ParseMyData ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b]
ParseMyData[id=63ef3e50-cf02-4a2f-b8f7-1415b39e521b] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
is not the most recent version of this FlowFile within this session (StandardProcessSession[id=21562]);
rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=6912ef5c-4bbe-414f-9a97-39c584b7a284,claim=,offset=0,name=testFile2.txt,size=0]
is not the most recent version of this FlowFile within this session (StandardProcessSession[id=21562])


I have attached my processor class, I would be grateful if you could give it a quick look
and tell me what I have done wrong.
Many thanksDave 


     On Saturday, 15 August 2015, 13:16, Mark Payne <markap14@hotmail.com> wrote:
   

 David,

In this case, since you want to keep the original intact, you will need to create a 'child'
flowfile to write to.
You do this with ProcessSession.create(FlowFile)

So you will have code that looks something like this:

final FlowFile original = session.get();
if (original == null) {
  return;
}

// create a new 'child' FlowFile. The framework will automatically handle
// the provenance information so that 'parsed' is forked from 'original'.
FlowFile parsed = session.create(original);

// Get an OutputStream for the 'parsed' FlowFile
parsed = session.write(parsed, new OutputStreamCallback() {
    public void process(OutputStream parsedOut) {

        // Get an InputStream for the original
        session.read(original, new InputStreamCallback() {
            public void process(InputStream originalIn) {
                // read from original FlowFile via originalIn
                // write to new FlowFile via parsedOut
            }
        });

    }
});

Does this give you what you need? If anything is still unclear, let us know!

Thanks
-Mark

----------------------------------------
> Date: Sat, 15 Aug 2015 10:04:54 +0100
> From: davidrsmith@btinternet.com
> Subject: Writing to a flowfile
> To: dev@nifi.apache.org
>
>
> Hi
>
> I'm writing a processor which parses a file, I want the parsed file to go to relationship
parsed, and the original file to go to relationship original, if the parse was ok.
> If the parse fails I want the original file to go to relationship failure.
>
> I have an inner class which contains a callback which does the parsing. The callback
is called from the onTrigger method.
> My problem is that I want to read from my original flowFile and write to a new flowFile,
but it always seems to write to the original flowfile.
> How do I direct my bufferedwriter to my new flowfile?
>
> Many thanks
> Dave
>
> Sent from Yahoo! Mail on Android
>
                         

  
Mime
  • Unnamed multipart/mixed (inline, None, 0 bytes)
View raw message