nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <>
Subject Re: Batch Processing
Date Wed, 16 Aug 2017 14:22:34 GMT

The ScanAttribute processor allows you to match 1 or more attributes against a dictionary.

Consuming data that is still being written is always a tough problem to tackle. We've seen
take many different approaches to this. One approach is to have the producer of the data use
"dot naming" convention, where they write to a file named .myFile.csv and then rename it to
to myFile.csv when done. This is often the easiest approach if you control the producers as

A more S3-centric approach is to configure the S3 bucket so that when data is finished being
written to the bucket, S3 can send a notification to SQS. Then you can use GetSQS to get this
notification and then use EvaluateXPath for instance to extract the information needed and
use FetchS3.


On Aug 16, 2017, at 10:13 AM, Andy Loughran <<>>

Hi Mark,

Yeah, I think that's what I have now.  The issue being that I could end up with a duplicate
of a file.

I guess I could use the DetectDuplicate processor to make sure that I de-dupe the Flowfiles
before I increment the counter.  The issue here is that I want the latest available FlowFile
to replace one if it exists (users could update a file's contents before a batch is complete).

Given there are 5 'types', is there a processor that allows me to match a 'type' attribute
against a dictionary?

On Wed, 16 Aug 2017 at 15:07 Mark Payne <<>>
Hi Andy and welcome to the community!

I think that what you're doing here seems very reasonable. If you want to wait for 5 'like
flowfiles' instead of
just 5 flowfiles, you should be able to use the "Signal Counter Name" of the Wait processor.
For example,
if your UpdateAttribute processor creates a 'type' and a 'batch' attribute, you can set the
Wait processor's
Signal Counter Name to "${type}" or to "${type}${batch}", depending on how you want to group
them together.
This will wait until you reach 5 flowfiles with the same "type" attribute (or combination
of "type" and "batch" attributes),
according to what you set as the Signal Counter Name.

Does this make sense?


> On Aug 16, 2017, at 9:55 AM, Andy Loughran <<>>
> Hey everyone,
> This is my first post.
> I'm building out a pipeline with Nifi, but am stuck on an architectural decision around
some fairly basic design.  I think I'm stuck as I'm operating on the wrong paradigm, but the
application receiving my flow is the limitation in this context.
> I'm using ListS3 to poll for csv files.  There need to be 5 different types of file uploaded
with a unique batch identifier for them to be released.  I'm using UpdateAttribute to rip
the type and batch from the filename, then using wait to hold the batch.
> At the moment though, I'm holding until a batch has 5 files, rather than 5 files with
each attribute type matching the expected types.
> Is this the wrong way to be thinking about this problem, or does this sound like a good
use case for Nifi - but using a better combination of processors.  If anyone could give me
guidance or point me toward an example template for batch process I'd be grateful.
> Look forward to helping out in the community where I can.
> Thanks,
> Andy

View raw message