nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <>
Subject Re: Unit test problems with ListFile development
Date Fri, 23 Oct 2015 12:57:25 GMT
Hey Joe,

Sorry it has taken me quite a long time to get back to you. Very much appreciate that you're

taking the time to tackle this, and taking the time to make sure that there are legit unit
Two thoughts come to mind reading this. First, there's a ticket that I have submitted a patch
NIFI-673 [1] that implements ListSFTP / FetchSFTP. The ListSFTP processor extends a newly
created processor, AbstractListProcessor. This is basically a copy of ListHDFS as well but
into an abstract processor with just a few methods. You may want to consider using this instead,
as soon as it is merged into the baseline. It will certainly take away a lot of the complexity,
I think.
The abstract methods that it exposes are:

     * Creates a Map of attributes that should be applied to the FlowFile to represent this
entity. This processor will emit a FlowFile for each "new" entity
     * (see the documentation for this class for a discussion of how this class determines
whether or not an entity is "new"). The FlowFile will contain no
     * content. The attributes that will be included are exactly the attributes that are returned
by this method.
     * @param entity the entity represented by the FlowFile
     * @param context the ProcessContext for obtaining configuration information
     * @return a Map of attributes for this entity
    protected abstract Map<String, String> createAttributes(T entity, ProcessContext

     * Returns the path to perform a listing on.
     * Many resources can be comprised of a "path" (or a "container" or "bucket", etc.) as
well as name or identifier that is unique only
     * within that path. This method is responsible for returning the path that is currently
being polled for entities. If this does concept
     * does not apply for the concrete implementation, it is recommended that the concrete
implementation return "." or "/" for all invocations of this method.
     * @param context the ProcessContex to use in order to obtain configuration
     * @return the path that is to be used to perform the listing, or <code>null</code>
if not applicable.
    protected abstract String getPath(final ProcessContext context);

     * Performs a listing of the remote entities that can be pulled. If any entity that is
returned has already been "discovered" or "emitted"
     * by this Processor, it will be ignored. A discussion of how the Processor determines
those entities that have already been emitted is
     * provided above in the documentation for this class. Any entity that is returned by
this method with a timestamp prior to the minTimestamp
     * will be filtered out by the Processor. Therefore, it is not necessary that implementations
perform this filtering but can be more efficient
     * if the filtering can be performed on the server side prior to retrieving the information.
     * @param context the ProcessContex to use in order to pull the appropriate entities
     * @param minTimestamp the minimum timestamp of entities that should be returned.
     * @return a Listing of entities that have a timestamp >= minTimestamp
    protected abstract List<T> performListing(final ProcessContext context, final Long
minTimestamp) throws IOException;

     * Determines whether or not the listing must be reset if the value of the given property
is changed
     * @param property the property that has changed
     * @return <code>true</code> if a change in value of the given property necessitates
that the listing be reset, <code>false</code> otherwise.
    protected abstract boolean isListingResetNecessary(final PropertyDescriptor property);

The abstract processor is then responsible for distributing this information to the appropriate
controller service, etc.
in order to make your life easier.

The second thought that I am having is a bit more directly related to your initial question
Some OS's will reduce the precision of the File.lastModified() date to second-level precision.
I.e., milliseconds are truncated. If you are trying to hold on to a date/timestamp using a
millisecond-precision field, and then comparing files' last modified times to that, you may
run into the type of problem you are describing here. It is probably worthwhile to check if
the last modified time that you are retrieving always ends in "000". If so, you may need to
the milliseconds (long normalizedTime = lastModified / 1000 * 1000);

Please let me know if this helps!


> On Oct 20, 2015, at 8:24 AM, Joe Skora <> wrote:
> All,
> I'm working on unit tests for the ListFile processor but I've run into 2
> problems that I can't resolve.  For background, ListFile is a blatant
> (steal this code) ripoff of ListHDFS but using the filesystem instead of
> HDFS.  My test class is cloned from  TestListHDFS and the problem test
> parallels testNoListUntilUpdateFromRemoteOnPrimaryNodeChange()
> <>.
> The test is supposed to works as follows.
>   1. Create tempFile1
>   2. Call processor run() and validate that 1 flow file processed is
>   tempFile1
>   3. Call clearTransferState() to reset processor
>   4. Create tempFile2
>   5. Trigger primary node change
>   6. Disable cache service
>   7. Call processor run() and confirm that tempFile2 was NOT processed
>   (while service is down)
>   8. Re-enable cache service
>   9. Call processor run() and validate that 1 flow file processed is
>   tempFile2
> So, the problems are:
>   1. After step #9, testFile1 and testFile2 flow files are returned, even
>   though none were returned after step #3.
>   2. Even with a 2 second sleep after #3, the timestamp on tempFile1 and
>   tempFile2 are the same, which may be contributing to issue #1.
> Any input appreciated.
> Regards,
> Joe

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message