nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rick Braddy <rbra...@softnas.com>
Subject RE: Transfer relationship not specified (FlowFileHandlingException)
Date Sat, 12 Sep 2015 18:16:48 GMT
I went back and re-read the Developer's Guide to try and understand how incoming FlowFiles
should be handled relative to session transactions.

Currently, my processor uses session.get() to pull a batch of up to 10 incoming flowfiles
and places them into an internal queue.  It never calls session.transfer() for these inbound
flowfiles.  It just reads the contents from each one using session.read() and processes that
input.  Each of these flowfiles contains a directory or filename of a file.

Then it creates up to 10 new outbound flowfiles as a batch, using session.importFrom() and
session.transfer() for each one, then finally calls session.commit().  It is at this point
that the framework raises an exception FlowFileHandlingException (transfer relationship not
specified) and fails.  Based upon debugging, it appears the exception is raised against the
first flowfile record that was recorded - the first incoming flowfile, for which session.transfer()
was never called.

Questions:

1. Is it required to "dispose" of incoming flowfiles that have been accessed via session.get()
using session.transfer()?

2. If so, should the incoming flowfiles be routed to an unused relationship, such as "ORIGINAL"
like the SplitText processor does?

3. I read about the "SupportsBatching" attribute, which is not set on my processor (or the
original GetFile processor, which does not read from input queues).  Given I am reading and
writing flowfiles in batches, should this attribute be set?

Thanks
Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com] 
Sent: Saturday, September 12, 2015 8:50 AM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

More information...Upon further debugging with in the StandardProcessSession : checkpoint()
method, it's clear that there are 16 items in "record" list - 10 are incoming flowfiles that
were read and processes, and 6 are the newly-created outbound flows.

The for loop is throwing the exception on the very first record, which is actually one of
the 10 inbound flowfiles that has already been processed:

       //validate that all records have a transfer relationship for them and if so determine
the destination node and clone as necessary
        final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
        for (final StandardRepositoryRecord record : records.values()) {
            if (record.isMarkedForDelete()) {
                continue;
            }
            final Relationship relationship = record.getTransferRelationship();
         if (relationship == null) {
                rollback();
 -->          throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship
not specified");
            }

I modified the standard GetFile processor to accept incoming flowfiles that contain a file
path per flowfile, so GetFileData (the new processor) can be triggered to process specific
files.  I did NOT define a specific incoming relationship and just assumed there is one already
available by default.  If there is not, that may be the problem.  There is clearly an inbound
relationship established, as the inbound flowfiles are being read and processed just fine,
but it seems that commit() calling checkpoint() doesn't like what it's seeing overall.

Rick

-----Original Message-----
From: Rick Braddy [mailto:rbraddy@softnas.com]
Sent: Saturday, September 12, 2015 7:42 AM
To: dev@nifi.apache.org
Subject: RE: Transfer relationship not specified (FlowFileHandlingException)

Ok.  Thanks Joe.  The files I'm using are simple .c text and .png image files for testing.

Rick

-----Original Message-----
From: Joe Witt [mailto:joe.witt@gmail.com]
Sent: Saturday, September 12, 2015 7:41 AM
To: dev@nifi.apache.org
Subject: Re: Transfer relationship not specified (FlowFileHandlingException)

Yep.  Looks legit to me.  Will try a unit test with a mixture of flowFiles associated with
content and without.

On Sat, Sep 12, 2015 at 8:12 AM, Rick Braddy <rbraddy@softnas.com> wrote:
> Joe,
>
> Replies below.
>
> Rick
>
>
> -----Original Message-----
> From: Joe Witt [mailto:joe.witt@gmail.com]
> Sent: Saturday, September 12, 2015 7:02 AM
> To: dev@nifi.apache.org
> Subject: Re: Transfer relationship not specified
> (FlowFileHandlingException)
>
> Rick
>
> Can you show what is happening in the exception handling part of your code as well?
>
> Yes.  This version has sending (empty) flowfiles for directory entries by-passed so only
files get processed, and also has attributes disabled (which did not help).
>
> Session.commit() is throwing the exception - no other errors or issues from session.importFrom()
or session.transfer().
>
> Here's the entire onTrigger method:
>
>     @Override
>     public void onTrigger(final ProcessContext context, final ProcessSession session)
throws ProcessException {
>         final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
>         final ProcessorLog logger = getLogger();
>
>         final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger();
>         if (fileQueue.size() < queueMax && filelistLock.tryLock()) {
>             try {
>                 final Set<File> filelist = getFileList(context, 
> session);
>
>                 queueLock.lock();
>                 try {
>                     filelist.removeAll(inProcess);
>                     if (!keepingSourceFile) {
>                         filelist.removeAll(recentlyProcessed);
>                     }
>
>                     fileQueue.clear();
>                     fileQueue.addAll(filelist);
>
>                     queueLastUpdated.set(System.currentTimeMillis());
>                     recentlyProcessed.clear();
>
>                     if (filelist.isEmpty()) {
>                         context.yield();
>                     }
>                 } finally {
>                     queueLock.unlock();
>                 }
>             } finally {
>                 filelistLock.unlock();
>             }
>         }
>
>         final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger();
>         final List<File> files = new ArrayList<>(batchSize);
>         queueLock.lock();
>         try {
>             fileQueue.drainTo(files, batchSize);
>             if (files.isEmpty()) {
>                 return;
>             } else {
>                 inProcess.addAll(files);
>             }
>         } finally {
>             queueLock.unlock();
>         }
>
>         final ListIterator<File> itr = files.listIterator();
>         FlowFile flowFile = null;
>         try {
>             while (itr.hasNext()) {
>                 final File file = itr.next();
>                 final Path filePath = file.toPath();
>                 final Path relativePath = filePath.relativize(filePath.getParent());
>                 String relativePathString = relativePath.toString() + "/";
>                 if (relativePathString.isEmpty()) {
>                     relativePathString = "./";
>                 }
>                 final Path absPath = filePath.toAbsolutePath();
>                 final String absPathString =
> absPath.getParent().toString() + "/";
>
>                 final long importStart = System.nanoTime();
>                 String fileType = "directory";
>                 if (file.isFile()){
>                     fileType = "file";
>                     flowFile = session.create();
>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>                 }
>                 else
>                 {
>                    logger.info("skipping directory {} and not placing into output flow",
new Object[]{file});
>                    continue; // ******* SKIP DIRECTORIES FOR NOW ****
>                 }
>
>                 final long importNanos = System.nanoTime() - importStart;
>                 final long importMillis = 
> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>
>               //  flowFile = session.putAttribute(flowFile, "file_type", fileType); //
directory or file
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(),
file.getName());
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(),
relativePathString);
>               //  flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(),
absPathString);
>               //  Map<String, String> attributes = getAttributesFromFile(filePath);
>               //  if (attributes.size() > 0) {
>               //      flowFile = session.putAllAttributes(flowFile, attributes);
>               //  }
>
>                 final String fileURI = file.toURI().toString();
>                 session.getProvenanceReporter().receive(flowFile, fileURI, importMillis);
>                 session.transfer(flowFile, REL_SUCCESS);
>                 logger.info("added {} to flow", new 
> Object[]{flowFile});
>
>                 if (!isScheduled()) {  // if processor stopped, put the rest of the files
back on the queue.
>                     queueLock.lock();
>                     try {
>                         while (itr.hasNext()) {
>                             final File nextFile = itr.next();
>                             fileQueue.add(nextFile);
>                             inProcess.remove(nextFile);
>                         }
>                     } finally {
>                         queueLock.unlock();
>                     }
>                 }
>             }
>             session.commit();
>         } catch (final Exception e) {
>             logger.error("Failed to transfer files due to {}", e);
>             context.yield();
>
>
>             // anything that we've not already processed needs to be put back on the
queue
>             if (flowFile != null) {
>                 session.remove(flowFile);
>             }
>         } finally {
>             queueLock.lock();
>             try {
>                 inProcess.removeAll(files);
>                 recentlyProcessed.addAll(files);
>             } finally {
>                 queueLock.unlock();
>             }
>         }
>     }
>
> }
>
> Also please confirm which codebase you're running against.  Latest HEAD of master?
>
> I'm using a snap from GitHub that's several weeks old from August 25th 
> (it's working fine with the original GetFile processor, which this 
> code was derived from)
>
> Thanks
> Joe
>
> On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rbraddy@softnas.com> wrote:
>> So the "transfer relationship not specified" occurs down in the Provenance processing,
where it checks to see if there are flowfile records associated with the session/relationship.
>>
>> There are.  When I inspect session.flowfilesout it's equal to 6, which is the correct
number of calls to importFrom and transfer(), so this confirms that transfer() is called and
did record the outbound flowfiles, yet when the provenance subsystem looks for these records
it does not find them.
>>
>> Not being intimate with the internals of the framework yet, not sure what would case
this.
>>
>> Rick
>>
>> -----Original Message-----
>> From: Rick Braddy
>> Sent: Friday, September 11, 2015 8:26 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Mark,
>>
>> The interesting thing is that session.transfer() is being called, as I have stepped
through it in the debugger.  I'm only calling importFrom() for actual files (not directories),
as shown below.  This is a modified version of GetFile processor.
>>
>> Rick
>>
>>        final ListIterator<File> itr = files.listIterator();
>>         FlowFile flowFile = null;
>>         try {
>>             while (itr.hasNext()) {
>>                 final File file = itr.next();
>>                 final Path filePath = file.toPath();
>>                 final Path relativePath = filePath.relativize(filePath.getParent());
>>                 String relativePathString = relativePath.toString() + "/";
>>                 if (relativePathString.isEmpty()) {
>>                     relativePathString = "./";
>>                 }
>>                 final Path absPath = filePath.toAbsolutePath();
>>                 final String absPathString =
>> absPath.getParent().toString() + "/";
>>
>>                 final long importStart = System.nanoTime();
>>                 String fileType = "directory";
>>                 flowFile = session.create();
>>                 if (file.isFile()){
>>                     fileType = "file";
>>                     flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
>>                 }
>>                 final long importNanos = System.nanoTime() - importStart;
>>                 final long importMillis = 
>> TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
>>
>>                 flowFile = session.putAttribute(flowFile, "file_type", fileType);
// directory or file
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(),
file.getName());
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(),
relativePathString);
>>                 flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(),
absPathString);
>>                 Map<String, String> attributes = getAttributesFromFile(filePath);
>>                 if (attributes.size() > 0) {
>>                     flowFile = session.putAllAttributes(flowFile, attributes);
>>                 }
>>
>>                 session.getProvenanceReporter().receive(flowFile, file.toURI().toString(),
importMillis);
>>                 session.transfer(flowFile, REL_SUCCESS);
>>                 logger.info("added {} to flow", new 
>> Object[]{flowFile});
>>
>>                 if (!isScheduled()) {  // if processor stopped, put the rest of the
files back on the queue.
>>                     queueLock.lock();
>>                     try {
>>                         while (itr.hasNext()) {
>>                             final File nextFile = itr.next();
>>                             fileQueue.add(nextFile);
>>                             inProcess.remove(nextFile);
>>                         }
>>                     } finally {
>>                         queueLock.unlock();
>>                     }
>>                 }
>>             }
>>             session.commit();
>>         } catch (final Exception e) {
>>             logger.error("Failed to transfer files due to {}", e);
>>
>> -----Original Message-----
>> From: Mark Payne [mailto:markap14@hotmail.com]
>> Sent: Friday, September 11, 2015 6:39 PM
>> To: dev@nifi.apache.org
>> Subject: RE: Transfer relationship not specified
>> (FlowFileHandlingException)
>>
>> Rick,
>> This error message isn't indicating that there's no Connection for the Relationship,
but rather than the FlowFile was never transferred.
>> I.e., there was never a call to session.transfer() for that FlowFile.
>> Thanks-Mark
>>
>>> From: rbraddy@softnas.com
>>> To: dev@nifi.apache.org
>>> Subject: RE: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>> Date: Fri, 11 Sep 2015 23:25:33 +0000
>>>
>>> Some more details:
>>>
>>> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to 
>>> process session due to
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967]):
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe
>>> 1 e a005,offset=0,name=printargs.c,size=190] is not known in this 
>>> session
>>> (StandardProcessSession[id=6967])
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData
>>> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added 
>>> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff
>>> 0 c ad6b,offset=0,name=anImage.png,size=16418] to flow
>>> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] 
>>> c.s.c.processors.files.GetFileData GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]
added StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0]
to flow ...
>>>
>>> So it seems there's some issue with each of the FlowFiles...
>>>
>>> -----Original Message-----
>>> From: Rick Braddy [mailto:rbraddy@softnas.com]
>>> Sent: Friday, September 11, 2015 6:00 PM
>>> To: dev@nifi.apache.org
>>> Subject: Transfer relationship not specified
>>> (FlowFileHandlingException)
>>>
>>> Hi,
>>>
>>> I have a processor that appears to be creating FlowFiles correctly (modified
a standard processor), but when it goes to commit() the session, an exception is raised:
>>>
>>> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] 
>>> c.s.c.processors.files.GetFileData
>>> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to 
>>> retrieve files due to {}
>>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,cla
>>> i
>>> m
>>> =org.apache.nifi.controller.repository.claim.StandardContentClaim@93
>>> 9
>>> 6 4c66,offset=244,name=225120878343804,size=42] transfer 
>>> relationship not specified
>>>
>>> I'm assuming this is supposed to be indicating there's no connection available
to commit the transfer; however, there is a "success" relationship registered during init()
in same way as original processor did it, and the success relationship out is connected to
another processor input as it should be.
>>>
>>> Any suggestions for troubleshooting?
>>>
>>> Rick
>>>
>>>
>>
Mime
View raw message