nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: Batch Processing
Date Wed, 16 Aug 2017 14:22:34 GMT
Andy,

The ScanAttribute processor allows you to match 1 or more attributes against a dictionary.

Consuming data that is still being written is always a tough problem to tackle. We've seen
people
take many different approaches to this. One approach is to have the producer of the data use
a
"dot naming" convention, where they write to a file named .myFile.csv and then rename it to
to myFile.csv when done. This is often the easiest approach if you control the producers as
well.

A more S3-centric approach is to configure the S3 bucket so that when data is finished being
written to the bucket, S3 can send a notification to SQS. Then you can use GetSQS to get this
notification and then use EvaluateXPath for instance to extract the information needed and
then
use FetchS3.

Thanks
-Mark


On Aug 16, 2017, at 10:13 AM, Andy Loughran <andy@zrmt.com<mailto:andy@zrmt.com>>
wrote:

Hi Mark,

Yeah, I think that's what I have now.  The issue being that I could end up with a duplicate
of a file.

I guess I could use the DetectDuplicate processor to make sure that I de-dupe the Flowfiles
before I increment the counter.  The issue here is that I want the latest available FlowFile
to replace one if it exists (users could update a file's contents before a batch is complete).

Given there are 5 'types', is there a processor that allows me to match a 'type' attribute
against a dictionary?

On Wed, 16 Aug 2017 at 15:07 Mark Payne <markap14@hotmail.com<mailto:markap14@hotmail.com>>
wrote:
Hi Andy and welcome to the community!

I think that what you're doing here seems very reasonable. If you want to wait for 5 'like
flowfiles' instead of
just 5 flowfiles, you should be able to use the "Signal Counter Name" of the Wait processor.
For example,
if your UpdateAttribute processor creates a 'type' and a 'batch' attribute, you can set the
Wait processor's
Signal Counter Name to "${type}" or to "${type}${batch}", depending on how you want to group
them together.
This will wait until you reach 5 flowfiles with the same "type" attribute (or combination
of "type" and "batch" attributes),
according to what you set as the Signal Counter Name.

Does this make sense?

Thanks
-Mark

> On Aug 16, 2017, at 9:55 AM, Andy Loughran <andy@zrmt.com<mailto:andy@zrmt.com>>
wrote:
>
> Hey everyone,
>
> This is my first post.
>
> I'm building out a pipeline with Nifi, but am stuck on an architectural decision around
some fairly basic design.  I think I'm stuck as I'm operating on the wrong paradigm, but the
application receiving my flow is the limitation in this context.
>
> I'm using ListS3 to poll for csv files.  There need to be 5 different types of file uploaded
with a unique batch identifier for them to be released.  I'm using UpdateAttribute to rip
the type and batch from the filename, then using wait to hold the batch.
>
> At the moment though, I'm holding until a batch has 5 files, rather than 5 files with
each attribute type matching the expected types.
>
> Is this the wrong way to be thinking about this problem, or does this sound like a good
use case for Nifi - but using a better combination of processors.  If anyone could give me
guidance or point me toward an example template for batch process I'd be grateful.
>
> Look forward to helping out in the community where I can.
>
> Thanks,
>
> Andy



Mime
View raw message