nifi-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: Reading all flowfiles queued for a processor (>20000 flowfiles)
Date Wed, 22 Aug 2018 13:54:59 GMT
Hi Sam,

There are a couple of ways to tackle this problem. My recommendation would be to look at extending
the BinFiles processor.
This is an abstract class, which MergeContent extends (and I think 1 or 2 other processors?).
Its job is to bin 'like flowfiles' together,
and it can take care of pulling data from queues and efficiently binning the FlowFiles together.
It is important, though, to keep in mind
that FlowFiles contain attribute maps, and those can quickly exhaust your heap when you're
trying to hold 10's or 100's of thousands
of FlowFiles in a single processor.

Thanks
-Mark


On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <tldzpysami4whpl1@gmail.com<mailto:tldzpysami4whpl1@gmail.com>>
wrote:

Hi NiFi devs,

My understanding is that when I create a custom processor, and get FlowFiles with session.get(CustomFlowFileFilter),
irrespective of how many times the CustomFlowFileFilter returns FlowFileFilterResult.ACCEPT_AND_CONTINUE
or FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at most 20000
flowfiles, where 20000 is defined by the nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected when running tests,
which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at: https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to read ALL queued flowfiles
in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are other ways to pair
files using Wait/Notify processors, I'm handling files in large throughput, with possible
delays between the pairs arriving, and it's quite easy to hit the limit. I could also increase
the swap threshold setting, but I keep hitting the problem. I've also played with custom prioritizers
on connections in an attempt to maximise the chance of having pairs occur, but because I need
to move unmatched flowfiles out, and back in, is essentially creating a busy loop. Seems like
there should be a better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued flowfiles (swapping
more than the threshold into memory, during a single OnTrigger call) would be the easiest
solution. Is there one?

Cheers,
Sam





Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message